Ignore:
Timestamp:
Mar 19, 2015, 10:01:11 AM (9 years ago)
Author:
david
Message:

commit of partial async queue process management

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/PublicaMundi_David-devel/zoo-project/zoo-kernel/zoo_loader.c

    r553 r617  
    6767
    6868#include "service_zcfg.h"
     69#include "zoo_json.h"
     70#include "zoo_amqp.h"
     71#include "zoo_sql.h"
    6972//#include "service_internal.h"
     73
     74
     75void
     76loadServiceAndRun (maps ** myMap, service * s1, map * request_inputs,
     77                   maps ** inputs, maps ** ioutputs, int *eres,FCGX_Stream *out, FCGX_Stream *err);
    7078
    7179xmlXPathObjectPtr extractFromDoc (xmlDocPtr, const char *);
     
    100108      int pid = getpid();
    101109      struct cgi_env *cgi;
    102       //PrintEnv(request.err, "Request environment", request.envp);
    103110      cgi = (struct cgi_env*)malloc(sizeof(struct cgi_env));
    104111      cgiMain_init (NULL, NULL,&cgi,request);
     
    455462      if (strQuery != NULL)
    456463        free (strQuery);
    457 
     464/*
     465      json_object *obj;
     466      maptojson(&obj,tmpMap);
     467      fprintf(stderr,"%s\n",json_object_to_json_string(obj));
     468      fflush(stderr);
     469  */   
    458470      runRequest (&tmpMap,&cgi,request);
    459471
     
    525537    return 1;
    526538  }
    527 
     539/*
     540 json_object *jobj;
     541  mapstojson(&jobj,conf);
     542  fprintf (stderr,"The json object created: %s\n",json_object_to_json_string(jobj));
     543    freeMaps(&conf);
     544 
     545  maps *conf_tmp;
     546  jsontomaps(jobj,&conf_tmp);
     547  dumpMaps(conf_tmp);
     548   return 1;
     549*/
    528550  char *rootDir;
    529551  map *m_rootDir = getMapFromMaps (conf, "server", "rootDir");
     
    684706    }
    685707  }
     708
     709
     710  char * amqp_host;
     711  map * m_amqp_host = getMapFromMaps (conf, "rabbitmq", "host");
     712  if (m_amqp_host == NULL){
     713    fprintf(stderr,"Configuration error: [rabbitmq] host");
     714    return 2;
     715  }
     716  else {
     717    amqp_host = (char *)malloc((strlen(m_amqp_host->value) +1)*sizeof(char*));
     718    strncpy(amqp_host,m_amqp_host->value,strlen(m_amqp_host->value));
     719    amqp_host[strlen(m_amqp_host->value)] = '\0';
     720 }
     721
     722  int amqp_port;
     723  map *m_amqp_port = getMapFromMaps (conf, "rabbitmq", "port");
     724  if (m_amqp_port == NULL){
     725    fprintf(stderr,"Configuration error: [rabbitmq] port");
     726    return 2;
     727  }
     728  else {
     729    amqp_port = atoi(m_amqp_port->value);
     730    if (amqp_port == 0){
     731        fprintf(stderr,"Configuration error: [rabbitmq] port");
     732        return 2;
     733    }
     734  }
     735
     736  char * amqp_user;
     737  map * m_amqp_user = getMapFromMaps (conf, "rabbitmq", "user");
     738  if (m_amqp_user == NULL){
     739    fprintf(stderr,"Configuration error: [rabbitmq] user");
     740    return 2;
     741  }
     742  else {
     743    amqp_user = (char *)malloc((strlen(m_amqp_user->value) +1)*sizeof(char*));
     744    strncpy(amqp_user,m_amqp_user->value,strlen(m_amqp_user->value));
     745    amqp_user[strlen(m_amqp_user->value)] = '\0';
     746 }
     747
     748  char * amqp_passwd;
     749  map * m_amqp_passwd = getMapFromMaps (conf, "rabbitmq", "passwd");
     750  if (m_amqp_passwd == NULL){
     751    fprintf(stderr,"Configuration error: [rabbitmq] passwd");
     752    return 2;
     753  }
     754  else {
     755    amqp_passwd = (char *)malloc((strlen(m_amqp_passwd->value) +1)*sizeof(char*));
     756    strncpy(amqp_passwd,m_amqp_passwd->value,strlen(m_amqp_passwd->value));
     757    amqp_passwd[strlen(m_amqp_passwd->value)] = '\0';
     758 }
     759
     760  char * amqp_exchange;
     761  map * m_amqp_exchange = getMapFromMaps (conf, "rabbitmq", "exchange");
     762  if (m_amqp_exchange == NULL){
     763    fprintf(stderr,"Configuration error: [rabbitmq] exchange");
     764    return 2;
     765  }
     766  else {
     767    amqp_exchange = (char *)malloc((strlen(m_amqp_exchange->value) +1)*sizeof(char*));
     768    strncpy(amqp_exchange,m_amqp_exchange->value,strlen(m_amqp_exchange->value));
     769    amqp_exchange[strlen(m_amqp_exchange->value)] = '\0';
     770 }
     771
     772  char * amqp_routingkey;
     773  map * m_amqp_routingkey = getMapFromMaps (conf, "rabbitmq", "routingkey");
     774  if (m_amqp_routingkey == NULL){
     775    fprintf(stderr,"Configuration error: [amqp] routingkey");
     776    return 2;
     777  }
     778  else {
     779    amqp_routingkey = (char *)malloc((strlen(m_amqp_routingkey->value) +1)*sizeof(char*));
     780    strncpy(amqp_routingkey,m_amqp_routingkey->value,strlen(m_amqp_routingkey->value));
     781    amqp_routingkey[strlen(m_amqp_routingkey->value)] = '\0';
     782 }
     783
     784  char * amqp_queue;
     785  map * m_amqp_queue = getMapFromMaps (conf, "rabbitmq", "queue");
     786  if (m_amqp_queue == NULL){
     787    fprintf(stderr,"Configuration error: [rabbitmq] queue");
     788    return 2;
     789  }
     790  else {
     791    amqp_queue = (char *)malloc((strlen(m_amqp_queue->value) +1)*sizeof(char*));
     792    strncpy(amqp_queue,m_amqp_queue->value,strlen(m_amqp_queue->value));
     793    amqp_queue[strlen(m_amqp_queue->value)] = '\0';
     794 }
     795
     796  char * status_user;
     797  map * m_status_user = getMapFromMaps (conf, "status", "user");
     798  if (m_status_user == NULL){
     799    fprintf(stderr,"Configuration error: [status] user");
     800    return 2;
     801  }
     802  else {
     803    status_user = (char *)malloc((strlen(m_status_user->value) +1)*sizeof(char*));
     804    strncpy(status_user,m_status_user->value,strlen(m_status_user->value));
     805    status_user[strlen(m_status_user->value)] = '\0';
     806  }
     807
     808
     809  char * status_passwd;
     810  map * m_status_passwd = getMapFromMaps (conf, "status", "passwd");
     811  if (m_status_passwd == NULL){
     812    fprintf(stderr,"Configuration error: [status] passwd");
     813    return 2;
     814  }
     815  else {
     816    status_passwd = (char *)malloc((strlen(m_status_passwd->value) +1)*sizeof(char*));
     817    strncpy(status_passwd,m_status_passwd->value,strlen(m_status_passwd->value));
     818    status_passwd[strlen(m_status_passwd->value)] = '\0';
     819  }
     820
     821  char * status_bdd;
     822  map * m_status_bdd = getMapFromMaps (conf, "status", "bdd");
     823  if (m_status_bdd == NULL){
     824    fprintf(stderr,"Configuration error: [status] bdd");
     825    return 2;
     826  }
     827  else {
     828    status_bdd = (char *)malloc((strlen(m_status_bdd->value) +1)*sizeof(char*));
     829    strncpy(status_bdd,m_status_bdd->value,strlen(m_status_bdd->value));
     830    status_bdd[strlen(m_status_bdd->value)] = '\0';
     831  }
     832
     833  char * status_host;
     834  map * m_status_host = getMapFromMaps (conf, "status", "host");
     835  if (m_status_host == NULL){
     836    fprintf(stderr,"Configuration error: [status] host");
     837    return 2;
     838  }
     839  else {
     840    status_host = (char *)malloc((strlen(m_status_host->value) +1)*sizeof(char*));
     841    strncpy(status_host,m_status_host->value,strlen(m_status_host->value));
     842    status_host[strlen(m_status_host->value)] = '\0';
     843  }
     844
     845  int status_port;
     846  map *m_status_port = getMapFromMaps (conf, "status", "port");
     847  if (m_status_port == NULL){
     848    fprintf(stderr,"Configuration error: [status] port");
     849    return 2;
     850  }
     851  else {
     852    status_port = atoi(m_status_port->value);
     853    if (status_port == 0){
     854        fprintf(stderr,"Configuration error: [status] port");
     855        return 2;
     856    }
     857  }
     858  init_sql(status_host,status_user,status_passwd,status_bdd,status_port);
    686859
    687860  int sock = FCGX_OpenSocket(listen, listen_queue);
     
    705878    return 3;
    706879  }
    707  
     880
     881  init_amqp(amqp_host,amqp_port,amqp_user, amqp_passwd, amqp_exchange, amqp_routingkey,amqp_queue);
     882
     883
    708884  int fork_status = fork();
    709885  if (fork_status == 0){
    710886    //child
    711     int forker_pid = getpid();
     887    int master_sync= getpid();
     888    fprintf(stderr,"Master sync%d\n",getpid());
    712889    FCGX_Init();
    713890    FCGX_Request request;
     
    718895        fork_status = fork();
    719896        if (fork_status == 0){
    720             fprintf(stderr,"child %d \n",i);
     897            fprintf(stderr,"child sync %d \n",getpid());
    721898            fflush(stderr);
    722899            break;
     
    724901    }
    725902    while(1){
    726         if (forker_pid != getpid()){
     903        /* mode synchrone */
     904        if (master_sync != getpid()){
    727905            while(FCGX_Accept_r(&request) == 0){
    728906                process(&request);
     
    737915        else {
    738916            wait(0);
    739             fprintf(stderr,"new child\n");
     917            fprintf(stderr,"Master sync %d\n",getpid());
     918            fprintf(stderr,"New sync Child\n");
    740919            fflush(stderr);
    741920            fork();
     
    744923  }
    745924  else {
    746  
    747   while(1);
     925   int master_async = getpid();
     926   fprintf(stderr,"Master async %d\n",master_async);
     927    int fork_s;
     928    int j;
     929    for (j = 0; j< async_worker; j++){
     930        fork_s = fork();
     931        if (fork_s == 0){
     932            fprintf(stderr,"child async %d \n",getpid());
     933            fflush(stderr);
     934            break;
     935        }
     936    }
     937    json_object *msg_obj;
     938    json_object *maps_obj;
     939    maps * map_c;
     940    json_object *req_format_jobj;
     941    maps * request_input_real_format;
     942    json_object *req_jobj;
     943    map * request_inputs;
     944    json_object *outputs_jobj;
     945    maps * request_output_real_format;
     946
     947    char *msg;
     948    int c;
     949    int eres;
     950    char * service_identifier;
     951    service * s1 = NULL;
     952    while(1){
     953        /* mode asynchrone */
     954        if( master_async != getpid()){
     955            /*traitement des requetes de la queue */
     956            bind_amqp();
     957            init_consumer();
     958            while(1){
     959               
     960                c = consumer_amqp(&msg);
     961                if (c == 0)
     962                    break;
     963                msg_obj = json_tokener_parse(msg);
     964               
     965                free(msg);
     966                maps_obj = json_object_object_get(msg_obj,"maps");
     967
     968                map_c = jsontomaps(maps_obj);
     969
     970                req_format_jobj = json_object_object_get(msg_obj,"request_input_real_format");
     971                request_input_real_format = jsontomaps(req_format_jobj);
     972
     973                req_jobj = json_object_object_get(msg_obj,"request_inputs");
     974                request_inputs = jsontomap(req_jobj);
     975
     976                outputs_jobj = json_object_object_get(msg_obj,"request_output_real_format");
     977                request_output_real_format = jsontomaps(outputs_jobj);
     978               
     979                json_object_put(msg_obj);
     980
     981                /* traitemement du message */
     982                /* Recherche des references */
     983                maps* tmp=request_input_real_format;
     984                HINTERNET hInternet = InternetOpen ((LPCTSTR) "ZooWPSClient\0",INTERNET_OPEN_TYPE_PRECONFIG, NULL, NULL, 0);
     985                while(tmp!=NULL){
     986                    map * tmp_map = getMap(tmp->content,"xlink:href");
     987                    if (tmp_map != NULL){
     988                        if (loadRemoteFile(&map_c, &tmp_map, &hInternet,tmp_map->value) < 0) {
     989                            /* passer le status failed dans la base de donnée */
     990                            fprintf(stderr,"Erreur de chargement \n");
     991                        }
     992                    }
     993                    tmp=tmp->next;
     994                }
     995                runHttpRequests (&map_c, &request_input_real_format, &hInternet);
     996                InternetCloseHandle (&hInternet);
     997                free(tmp);
     998                map * uuid = getMapFromMaps(map_c,"lenv","usid");
     999                if (uuid != NULL)
     1000                    start_job(uuid->value);
     1001                map *t=createMap("background","1");
     1002                maps * lenv = getMaps(map_c,"lenv");
     1003                addMapToMap(&lenv->content,t);
     1004                freeMap(&t);
     1005                free(t);
     1006               
     1007                map * m_identifier = getMap (request_inputs, "Identifier");
     1008               
     1009                service_identifier = zStrdup (m_identifier->value);
     1010
     1011                s1 = search_service (service_identifier);
     1012                free(service_identifier);
     1013               
     1014
     1015                //dumpMaps(request_input_real_format);
     1016
     1017                loadServiceAndRun(&map_c, s1,request_inputs,&request_input_real_format, &request_output_real_format, &eres,NULL,NULL);
     1018                if (eres == SERVICE_SUCCEEDED) {
     1019                    outputResponse (s1,request_input_real_format,request_output_real_format,request_inputs, 0, map_c, eres,NULL,NULL);
     1020                }
     1021                   
     1022                   
     1023                //dumpMaps(request_output_real_format);
     1024                //fprintf(stderr,"################################################################\n");
     1025                //dumpMaps(map_c);
     1026
     1027                outputResponse (s1,request_input_real_format,request_output_real_format,request_inputs, 0, map_c, eres,NULL,NULL);
     1028
     1029               
     1030                freeMaps(&map_c);
     1031                map_c= NULL;
     1032               
     1033                freeMaps(&request_input_real_format);
     1034                request_input_real_format = NULL;
     1035
     1036                //dumpMap(request_inputs);
     1037                freeMap(&request_inputs);
     1038                request_inputs = NULL;
     1039               
     1040                //dumpMaps(request_output_real_format);
     1041                freeMaps(&request_output_real_format);
     1042                request_output_real_format = NULL;
     1043                consumer_ack_amqp(c);
     1044               
     1045
     1046            }
     1047            close_amqp();
     1048           
     1049
     1050
     1051
     1052           
     1053        }
     1054        else {
     1055            wait(0);
     1056            fprintf(stderr,"Master async %d\n",getpid());
     1057            fprintf(stderr,"New async Child\n");
     1058            fflush(stderr);
     1059            fork();
     1060        }
     1061    }
    7481062
    7491063  }
Note: See TracChangeset for help on using the changeset viewer.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png