source: branches/PublicaMundi_David-devel/zoo-project/zoo-kernel/zoo_loader.c @ 617

Last change on this file since 617 was 617, checked in by david, 9 years ago

commit of partial async queue process management

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc
File size: 33.9 KB
Line 
1/**
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2008-2011 GeoLabs SARL. All rights reserved.
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * THE SOFTWARE.
23 */
24
25#define MALLOC_CHECK_ 0
26#define MALLOC_CHECK 0
27
28/**
29 * Specific includes
30 */
31#ifndef WIN32
32/*
33#include "fcgio.h"
34#include "fcgi_config.h"
35#include "fcgi_stdio.h"
36*/
37#include <unistd.h>
38#include <fcgiapp.h>
39#endif
40#include <sys/wait.h>
41#include <pthread.h>
42#include <sys/types.h>
43#include <unistd.h>
44#include "service_internal.h"
45#ifdef WIN32
46#include "windows.h"
47#define strtok_r strtok_s
48#endif
49
50extern "C"
51{
52#include "cgic.h"
53#include <libxml/tree.h>
54#include <libxml/xmlmemory.h>
55#include <libxml/parser.h>
56#include <libxml/xpath.h>
57#include <libxml/xpathInternals.h>
58
59#include <string.h>
60#include <stdio.h>
61#include <stdlib.h>
62#include <glib.h>
63#include <sys/stat.h>
64#include <ctype.h>
65
66}
67
68#include "service_zcfg.h"
69#include "zoo_json.h"
70#include "zoo_amqp.h"
71#include "zoo_sql.h"
72//#include "service_internal.h"
73
74
75void
76loadServiceAndRun (maps ** myMap, service * s1, map * request_inputs,
77                   maps ** inputs, maps ** ioutputs, int *eres,FCGX_Stream *out, FCGX_Stream *err);
78
79xmlXPathObjectPtr extractFromDoc (xmlDocPtr, const char *);
80int runRequest (map **,struct cgi_env **,FCGX_Request *);
81
82using namespace std;
83
84#ifndef TRUE
85#define TRUE 1
86#endif
87#ifndef FALSE
88#define FALSE -1
89#endif
90
91
92static void PrintEnv(FCGX_Stream *out, char *label, char **envp)
93{
94    FCGX_FPrintF(out, "%s:<br>\n<pre>\n", label);
95    for( ; *envp != NULL; envp++) {
96        FCGX_FPrintF(out, "%s\n", *envp);
97    }
98    FCGX_FPrintF(out, "</pre><p>\n");
99}
100
101#define PATH_SOCKET "/tmp/zoo.sock"
102#define THREAD_COUNT 50
103static int counts[THREAD_COUNT];
104
105
106int process(FCGX_Request *request){
107
108      int pid = getpid();
109      struct cgi_env *cgi;
110      cgi = (struct cgi_env*)malloc(sizeof(struct cgi_env));
111      cgiMain_init (NULL, NULL,&cgi,request);
112      char *strQuery = NULL;
113      if (cgi->cgiQueryString != NULL)
114        strQuery = zStrdup (cgi->cgiQueryString);
115      map *tmpMap = NULL;
116
117      if (strncmp (cgi->cgiContentType, "text/xml", 8) == 0 ||
118          strncasecmp (cgi->cgiRequestMethod, "post", 4) == 0)
119        {
120          if (cgi->cgiContentLength == 0)
121            {
122              char *post_data = NULL;
123              int i = 0;
124              int ch = FCGX_GetChar(request->in);
125              while (ch != -1){
126                {
127                  i++;
128                  if (post_data == NULL)
129                    {
130                    post_data=(char*)malloc(sizeof(char));
131                    post_data[i-1] = (char) ch;
132                    }
133                  else
134                    {
135                    post_data=(char*)realloc(post_data,i*sizeof(char));
136                    post_data[i-1] = (char) ch;
137                    }
138                  ch = FCGX_GetChar(request->in);
139                  if (ch == -1 ){
140                    post_data=(char*)realloc(post_data,(i + 1)*sizeof(char));
141                    post_data[i] = '\0';
142                    }
143                }
144                cgi->cgiContentLength = i;
145              if (post_data == NULL && (strQuery == NULL || strlen (strQuery) == 0))
146                {
147                  return errorException (NULL,
148                                         "ZOO-Kernel failed to process your request cause the request was emtpty.",
149                                         "InternalError", NULL,request->out);
150                }
151              else
152                {
153                  if (strQuery == NULL || strlen (strQuery) == 0)
154                    tmpMap = createMap ("request", post_data);
155                }
156              if (post_data != NULL)
157                free (post_data);
158                }
159            }
160          else
161            {
162              char *post_data = new char[cgi->cgiContentLength + 1];
163              int r = FCGX_GetStr(post_data,cgi->cgiContentLength,request->in);
164              if ( r > 0)
165                {
166                  post_data[r] = '\0';
167                  cgi->cgiContentLength = r;
168                  tmpMap = createMap ("request", post_data);
169                }
170              else
171                {
172                  post_data[0] = '\0';
173                  char **array, **arrayStep;
174                  if (cgiFormEntries (&array,&cgi) != cgiFormSuccess)
175                    {
176                      return 1;
177                    }
178                  arrayStep = array;
179                  while (*arrayStep)
180                    {
181                      char *ivalue = new char[cgi->cgiContentLength];
182                      cgiFormStringNoNewlines (*arrayStep, ivalue,
183                                               cgi->cgiContentLength,&cgi);
184                      char *tmpValueFinal =
185                        (char *)
186                        malloc ((strlen (*arrayStep) + strlen (ivalue) +
187                                 1) * sizeof (char));
188                      sprintf (tmpValueFinal, "%s=%s", *arrayStep, ivalue);
189                      if (strlen (post_data) == 0)
190                        {
191                          sprintf (post_data, "%s", tmpValueFinal);
192                        }
193                      else
194                        {
195                          char *tmp = zStrdup (post_data);
196                          sprintf (post_data, "%s&%s", tmp, tmpValueFinal);
197                          free (tmp);
198                        }
199                      free (tmpValueFinal);
200#ifdef DEBUG
201                      fprintf (stderr, "(( \n %s \n %s \n ))", *arrayStep,
202                               ivalue);
203#endif
204                      delete[]ivalue;
205                      arrayStep++;
206                    }
207                  if (tmpMap != NULL)
208                    addToMap (tmpMap, "request", post_data);
209                  else
210                    tmpMap = createMap ("request", post_data);
211                }
212              delete[]post_data;
213            }
214        }
215      else
216        {
217#ifdef DEBUG
218          dumpMap (tmpMap);
219#endif
220       
221       char **array, **arrayStep;
222          if (cgiFormEntries (&array,&cgi) != cgiFormSuccess)
223            {
224              return 1;
225            }
226          arrayStep = array;
227          while (*arrayStep)
228            {
229              char *value = new char[cgi->cgiContentLength];
230              cgiFormStringNoNewlines (*arrayStep, value, cgi->cgiContentLength,&cgi);
231#ifdef DEBUG
232              fprintf (stderr, "(( \n %s \n %s \n ))", *arrayStep, value);
233#endif
234              if (tmpMap != NULL)
235                addToMap (tmpMap, *arrayStep, value);
236              else
237                tmpMap = createMap (*arrayStep, value);
238              arrayStep++;
239              delete[]value;
240            }
241          cgiStringArrayFree (array);
242        }
243
244#ifdef WIN32
245      map *tmpReq = getMap (tmpMap, "rfile");
246      if (tmpReq != NULL)
247        {
248          FILE *lf = fopen (tmpReq->value, "r");
249          fseek (lf, 0, SEEK_END);
250          long flen = ftell (lf);
251          fseek (lf, 0, SEEK_SET);
252          char *buffer = (char *) malloc ((flen + 1) * sizeof (char));
253          fread (buffer, flen, 1, lf);
254          fclose (lf);
255          addToMap (tmpMap, "request", buffer);
256          free (buffer);
257          cgiContentLength = flen + 9;
258        }
259#endif
260  /**
261   * In case that the POST method was used, then check if params came in XML
262   * format else try to use the attribute "request" which should be the only
263   * one.
264   */
265      if (strncasecmp (cgi->cgiRequestMethod, "post", 4) == 0 ||
266          (count (tmpMap) == 1 && strncmp (tmpMap->value, "<", 1) == 0)
267#ifdef WIN32
268          || tmpReq != NULL
269#endif
270        )
271        {
272    /**
273     * Store the original XML request in xrequest map
274     */
275          map *t1 = getMap (tmpMap, "request");
276          if (t1 != NULL && strncasecmp (t1->value, "<", 1) == 0)
277            {
278              addToMap (tmpMap, "xrequest", t1->value);
279              xmlInitParser ();
280              xmlDocPtr doc = xmlParseMemory (t1->value, cgi->cgiContentLength);
281              {
282                xmlXPathObjectPtr reqptr = extractFromDoc (doc,
283                                                           "/*[local-name()='Envelope']/*[local-name()='Body']/*");
284                if (reqptr != NULL)
285                  {
286                    xmlNodeSet *req = reqptr->nodesetval;
287                    if (req != NULL && req->nodeNr == 1)
288                      {
289                        addToMap (tmpMap, "soap", "true");
290                        for (int k = 0; k < req->nodeNr; k++)
291                          {
292                            //xmlNsPtr ns=xmlNewNs(req->nodeTab[k],BAD_CAST "http://www.w3.org/2001/XMLSchema-instance",BAD_CAST "xsi");
293                            xmlDocSetRootElement (doc, req->nodeTab[k]);
294                            xmlChar *xmlbuff;
295                            int buffersize;
296                            xmlDocDumpFormatMemoryEnc (doc, &xmlbuff,
297                                                       &buffersize, "utf-8",
298                                                       1);
299                            addToMap (tmpMap, "xrequest", (char *) xmlbuff);
300                            xmlFree (xmlbuff);
301                          }
302                      }
303                    xmlXPathFreeObject (reqptr);
304                  }
305              }
306
307              xmlNodePtr cur = xmlDocGetRootElement (doc);
308              char *tval;
309              tval = NULL;
310              tval = (char *) xmlGetProp (cur, BAD_CAST "service");
311              if (tval != NULL)
312                {
313                  addToMap (tmpMap, "service", tval);
314                  xmlFree (tval);
315                }
316              tval = NULL;
317              tval = (char *) xmlGetProp (cur, BAD_CAST "language");
318              if (tval != NULL)
319                {
320                  addToMap (tmpMap, "language", tval);
321                  xmlFree (tval);
322                }
323              const char *requests[3] =
324                { "GetCapabilities", "DescribeProcess", "Execute" };
325              for (int j = 0; j < 3; j++)
326                {
327                  char tt[128];
328                  sprintf (tt, "/*[local-name()='%s']", requests[j]);
329                  xmlXPathObjectPtr reqptr = extractFromDoc (doc, tt);
330                  if (reqptr != NULL)
331                    {
332                      xmlNodeSet *req = reqptr->nodesetval;
333#ifdef DEBUG
334                      fprintf (stderr, "%i", req->nodeNr);
335#endif
336                      if (req != NULL && req->nodeNr == 1)
337                        {
338                          if (t1->value != NULL)
339                            free (t1->value);
340                          t1->value = zStrdup (requests[j]);
341                          j = 2;
342                        }
343                      xmlXPathFreeObject (reqptr);
344                    }
345                }
346              if (strncasecmp (t1->value, "GetCapabilities", 15) == 0)
347                {
348                  xmlXPathObjectPtr versptr =
349                    extractFromDoc (doc, "/*/*/*[local-name()='Version']");
350                  xmlNodeSet *vers = versptr->nodesetval;
351                  xmlChar *content = xmlNodeListGetString (doc,
352                                                           vers->
353                                                           nodeTab
354                                                           [0]->xmlChildrenNode,
355                                                           1);
356                  addToMap (tmpMap, "version", (char *) content);
357                  xmlXPathFreeObject (versptr);
358                  xmlFree (content);
359                }
360              else
361                {
362                  tval = NULL;
363                  tval = (char *) xmlGetProp (cur, BAD_CAST "version");
364                  if (tval != NULL)
365                    {
366                      addToMap (tmpMap, "version", tval);
367                      xmlFree (tval);
368                    }
369                  tval = (char *) xmlGetProp (cur, BAD_CAST "language");
370                  if (tval != NULL)
371                    {
372                      addToMap (tmpMap, "language", tval);
373                      xmlFree (tval);
374                    }
375                  xmlXPathObjectPtr idptr =
376                    extractFromDoc (doc, "/*/*[local-name()='Identifier']");
377                  if (idptr != NULL)
378                    {
379                      xmlNodeSet *id = idptr->nodesetval;
380                      if (id != NULL)
381                        {
382                          char *identifiers = NULL;
383                          identifiers =
384                            (char *) calloc (cgi->cgiContentLength, sizeof (char));
385                          identifiers[0] = 0;
386                          for (int k = 0; k < id->nodeNr; k++)
387                            {
388                              xmlChar *content = xmlNodeListGetString (doc,
389                                                                       id->nodeTab
390                                                                       [k]->
391                                                                       xmlChildrenNode,
392                                                                       1);
393                              if (strlen (identifiers) > 0)
394                                {
395                                  char *tmp = zStrdup (identifiers);
396                                  snprintf (identifiers,
397                                            strlen (tmp) +
398                                            xmlStrlen (content) + 2, "%s,%s",
399                                            tmp, content);
400                                  free (tmp);
401                                }
402                              else
403                                {
404                                  snprintf (identifiers,
405                                            xmlStrlen (content) + 1, "%s",
406                                            content);
407                                }
408                              xmlFree (content);
409                            }
410                          xmlXPathFreeObject (idptr);
411                          addToMap (tmpMap, "Identifier", identifiers);
412                          free (identifiers);
413                        }
414                    }
415                }
416              xmlFreeDoc (doc);
417              xmlCleanupParser ();
418            }
419          else
420            {
421              freeMap (&tmpMap);
422              free (tmpMap);
423              tmpMap = createMap ("not_valid", "true");
424            }
425
426          char *token, *saveptr;
427          token = strtok_r (cgi->cgiQueryString, "&", &saveptr);
428          while (token != NULL)
429            {
430              char *token1, *saveptr1;
431              char *name = NULL;
432              char *value = NULL;
433              token1 = strtok_r (token, "=", &saveptr1);
434              while (token1 != NULL)
435                {
436                  if (name == NULL)
437                    name = zStrdup (token1);
438                  else
439                    value = zStrdup (token1);
440                  token1 = strtok_r (NULL, "=", &saveptr1);
441                }
442              addToMap (tmpMap, name, value);
443              free (name);
444              free (value);
445              name = NULL;
446              value = NULL;
447              token = strtok_r (NULL, "&", &saveptr);
448            }
449
450        }
451
452      if (strncasecmp (cgi->cgiContentType, "multipart/form-data", 19) == 0)
453        {
454          map *tmp = getMap (tmpMap, "dataInputs");
455          if (tmp != NULL)
456            {
457              addToMap (tmpMap, "dataInputs",
458                        strstr (strQuery, "dataInputs=") + 11);
459            }
460        }
461
462      if (strQuery != NULL)
463        free (strQuery);
464/*
465      json_object *obj;
466      maptojson(&obj,tmpMap);
467      fprintf(stderr,"%s\n",json_object_to_json_string(obj));
468      fflush(stderr);
469  */   
470      runRequest (&tmpMap,&cgi,request);
471
472  /**
473   * Required but can't be made after executing a process using POST requests.
474   */
475      if ( /*strncasecmp(cgiRequestMethod,"post",4)!=0 && count(tmpMap)!=1 && */ tmpMap != NULL)
476        {
477          freeMap (&tmpMap);
478          free (tmpMap);
479        }
480        // a verifier fait planter
481      cgiFreeResources (&cgi);
482
483      FCGX_Finish_r(request);
484      return 0;
485}
486
487
488
489int
490main (int argc, char *argv[])
491{
492
493
494 int debug_flag = 0;
495 int background_flag = 0;
496 char *file_value = NULL;
497 int index;
498 int c;
499
500 opterr = 0;
501 while ((c = getopt (argc, argv, "dbhf:")) != -1)
502      switch (c)
503      {
504      case 'd':
505        debug_flag = 1;
506        break;
507      case 'b':
508        background_flag = 1;
509        break;
510      case 'h':
511        fprintf(stderr,"TODO: need to print help\n");
512        fflush(stderr);
513        return 0;
514      case 'f':
515        file_value = optarg;
516        break;
517      case '?':
518        if (optopt == 'f')
519          fprintf (stderr, "Option -%c requires an argument.\n", optopt);
520        else if (isprint (optopt))
521          fprintf (stderr, "Unknown option `-%c'.\n", optopt);
522        else
523          fprintf (stderr,"Unknown option character `\\x%x'.\n",optopt);
524        return 1;
525      default:
526        abort ();
527      }
528
529
530  maps *conf;
531  conf = (maps *) malloc (MAPS_SIZE);
532 
533  int ret = conf_read (file_value, conf);
534  if ( ret == 2){
535    //a verifier mais conf_read ne renvoie jamais 0
536    fprintf(stderr,"Erreur lors de la lecture de %s\n",file_value);
537    return 1;
538  }
539/*
540 json_object *jobj;
541  mapstojson(&jobj,conf);
542  fprintf (stderr,"The json object created: %s\n",json_object_to_json_string(jobj));
543    freeMaps(&conf);
544 
545  maps *conf_tmp;
546  jsontomaps(jobj,&conf_tmp);
547  dumpMaps(conf_tmp);
548   return 1;
549*/
550  char *rootDir;
551  map *m_rootDir = getMapFromMaps (conf, "server", "rootDir");
552  if (m_rootDir == NULL){
553    fprintf(stderr,"Configuration error: rootDir");
554    return 2;
555  }
556  else {
557   rootDir = (char*)malloc((strlen(m_rootDir->value) +1)*sizeof(char*));
558   strncpy(rootDir,m_rootDir->value,strlen(m_rootDir->value));
559   rootDir[strlen(m_rootDir->value)] = '\0';
560   //freeMap(&m_rootDir);
561  }
562
563  int req_worker;
564  map *m_req_worker = getMapFromMaps (conf, "server", "req_worker");
565  if (m_req_worker == NULL){
566    fprintf(stderr,"Configuration error: req_worker not found");
567    return 2;
568  }
569  else {
570    req_worker=atoi(m_req_worker->value);
571    //freeMap(&m_req_worker);
572    if (req_worker == 0){
573        fprintf(stderr,"Configuration error: req_worker");
574        return 2;
575    }
576  }
577 
578  int async_worker;
579  map *m_async_worker = getMapFromMaps (conf, "server", "async_worker"); 
580  if (m_async_worker == NULL){
581    fprintf(stderr,"Configuration error: async_worker not found");
582    return 2;
583  }
584  else {
585    async_worker = atoi(m_async_worker->value);
586    //freeMap(&m_async_worker);
587    if (async_worker == 0){
588        fprintf(stderr,"Configuration error: req_worker");
589        return 2;
590    }
591  }
592
593  int max_requests;
594  map *m_max_requests = getMapFromMaps (conf, "server", "max_requests");
595  if (m_max_requests == NULL){
596    fprintf(stderr,"Configuration error: max_requests");
597    return 2;
598  }
599  else {
600    max_requests = atoi(m_max_requests->value);
601    //freeMap(&m_max_requests);
602    if (max_requests == 0){
603        fprintf(stderr,"Configuration error: max_requests");
604        return 2;
605    }
606  }
607
608  map *m_listen = getMapFromMaps (conf, "server", "listen");
609  char *listen;
610  if (m_listen == NULL){
611    fprintf(stderr,"Configuration error: listen not found");
612    return 2;
613  }
614  else {
615    listen = (char *)malloc((strlen(m_listen->value) +1)*sizeof(char*));
616    strncpy(listen,m_listen->value,strlen(m_listen->value));
617    listen[strlen(m_listen->value)] = '\0';
618    //freeMap(&m_listen);
619  }
620  int listen_owner;
621  map *m_listen_owner = getMapFromMaps (conf, "server", "listen_owner");
622  if (m_listen_owner == NULL){
623    fprintf(stderr,"Configuration error: listen_owner");
624    return 2;
625  }
626  else {
627    listen_owner = atoi(m_listen_owner->value);
628    //freeMap(&m_listen_owner);
629    if (listen_owner == 0){
630        fprintf(stderr,"Configuration error: listen_owner");
631        return 2;
632    }
633  }
634
635  int listen_group;
636  map *m_listen_group = getMapFromMaps (conf, "server", "listen_group");
637  if (m_listen_group == NULL){
638    fprintf(stderr,"Configuration error: listen_group");
639    return 2;
640  }
641  else {
642    listen_group = atoi(m_listen_group->value);
643    //freeMap(&m_listen_group);
644    if (listen_group == 0){
645        fprintf(stderr,"Configuration error: listen_group");
646        return 2;
647    }
648  }
649
650  char * listen_mode;
651  map *m_listen_mode = getMapFromMaps (conf, "server", "listen_mode");
652  if (m_listen_mode == NULL){
653    fprintf(stderr,"Configuration error: listen_mode");
654    return 2;
655  }
656  else {
657    listen_mode = (char *)malloc((strlen(m_listen_mode->value) +1)*sizeof(char*));
658    strncpy(listen_mode,m_listen_mode->value,strlen(m_listen_mode->value));
659    listen_mode[strlen(m_listen_mode->value)] = '\0';
660    //freeMap(&m_listen_mode);
661  }
662
663  int listen_queue;
664  map *m_listen_queue = getMapFromMaps (conf, "server", "listen_queue");
665  if (m_listen_queue == NULL){
666    fprintf(stderr,"Configuration error: listen_queue");
667    return 2;
668  }
669  else {
670    listen_queue = atoi(m_listen_queue->value);
671    //freeMap(&m_listen_queue);
672    if (listen_queue == 0){
673        fprintf(stderr,"Configuration error: listen_queue");
674        return 2;
675    }
676  }
677
678  int id_user;
679  map *m_user = getMapFromMaps (conf, "server", "uid");
680  if (m_user == NULL){
681    fprintf(stderr,"Configuration error: id_user");
682    return 2;
683  }
684  else {
685    id_user = atoi(m_user->value);
686    //freeMap(&m_user);
687    if (id_user == 0){
688        fprintf(stderr,"Configuration error: id_user");
689        return 2;
690    }
691  }
692
693
694  int id_group;
695  map *m_group = getMapFromMaps (conf, "server", "gid");
696  if (m_group == NULL){
697    fprintf(stderr,"Configuration error: gid");
698    return 2;
699  }
700  else {
701    id_group = atoi(m_group->value);
702    //freeMap(&m_group);
703    if (id_group == 0){
704        fprintf(stderr,"Configuration error: id_group");
705        return 2;
706    }
707  }
708
709
710  char * amqp_host;
711  map * m_amqp_host = getMapFromMaps (conf, "rabbitmq", "host");
712  if (m_amqp_host == NULL){
713    fprintf(stderr,"Configuration error: [rabbitmq] host");
714    return 2;
715  }
716  else {
717    amqp_host = (char *)malloc((strlen(m_amqp_host->value) +1)*sizeof(char*));
718    strncpy(amqp_host,m_amqp_host->value,strlen(m_amqp_host->value));
719    amqp_host[strlen(m_amqp_host->value)] = '\0';
720 }
721
722  int amqp_port;
723  map *m_amqp_port = getMapFromMaps (conf, "rabbitmq", "port");
724  if (m_amqp_port == NULL){
725    fprintf(stderr,"Configuration error: [rabbitmq] port");
726    return 2;
727  }
728  else {
729    amqp_port = atoi(m_amqp_port->value);
730    if (amqp_port == 0){
731        fprintf(stderr,"Configuration error: [rabbitmq] port");
732        return 2;
733    }
734  }
735
736  char * amqp_user;
737  map * m_amqp_user = getMapFromMaps (conf, "rabbitmq", "user");
738  if (m_amqp_user == NULL){
739    fprintf(stderr,"Configuration error: [rabbitmq] user");
740    return 2;
741  }
742  else {
743    amqp_user = (char *)malloc((strlen(m_amqp_user->value) +1)*sizeof(char*));
744    strncpy(amqp_user,m_amqp_user->value,strlen(m_amqp_user->value));
745    amqp_user[strlen(m_amqp_user->value)] = '\0';
746 }
747
748  char * amqp_passwd;
749  map * m_amqp_passwd = getMapFromMaps (conf, "rabbitmq", "passwd");
750  if (m_amqp_passwd == NULL){
751    fprintf(stderr,"Configuration error: [rabbitmq] passwd");
752    return 2;
753  }
754  else {
755    amqp_passwd = (char *)malloc((strlen(m_amqp_passwd->value) +1)*sizeof(char*));
756    strncpy(amqp_passwd,m_amqp_passwd->value,strlen(m_amqp_passwd->value));
757    amqp_passwd[strlen(m_amqp_passwd->value)] = '\0';
758 }
759
760  char * amqp_exchange;
761  map * m_amqp_exchange = getMapFromMaps (conf, "rabbitmq", "exchange");
762  if (m_amqp_exchange == NULL){
763    fprintf(stderr,"Configuration error: [rabbitmq] exchange");
764    return 2;
765  }
766  else {
767    amqp_exchange = (char *)malloc((strlen(m_amqp_exchange->value) +1)*sizeof(char*));
768    strncpy(amqp_exchange,m_amqp_exchange->value,strlen(m_amqp_exchange->value));
769    amqp_exchange[strlen(m_amqp_exchange->value)] = '\0';
770 }
771
772  char * amqp_routingkey;
773  map * m_amqp_routingkey = getMapFromMaps (conf, "rabbitmq", "routingkey");
774  if (m_amqp_routingkey == NULL){
775    fprintf(stderr,"Configuration error: [amqp] routingkey");
776    return 2;
777  }
778  else {
779    amqp_routingkey = (char *)malloc((strlen(m_amqp_routingkey->value) +1)*sizeof(char*));
780    strncpy(amqp_routingkey,m_amqp_routingkey->value,strlen(m_amqp_routingkey->value));
781    amqp_routingkey[strlen(m_amqp_routingkey->value)] = '\0';
782 }
783
784  char * amqp_queue;
785  map * m_amqp_queue = getMapFromMaps (conf, "rabbitmq", "queue");
786  if (m_amqp_queue == NULL){
787    fprintf(stderr,"Configuration error: [rabbitmq] queue");
788    return 2;
789  }
790  else {
791    amqp_queue = (char *)malloc((strlen(m_amqp_queue->value) +1)*sizeof(char*));
792    strncpy(amqp_queue,m_amqp_queue->value,strlen(m_amqp_queue->value));
793    amqp_queue[strlen(m_amqp_queue->value)] = '\0';
794 }
795
796  char * status_user;
797  map * m_status_user = getMapFromMaps (conf, "status", "user");
798  if (m_status_user == NULL){
799    fprintf(stderr,"Configuration error: [status] user");
800    return 2;
801  }
802  else {
803    status_user = (char *)malloc((strlen(m_status_user->value) +1)*sizeof(char*));
804    strncpy(status_user,m_status_user->value,strlen(m_status_user->value));
805    status_user[strlen(m_status_user->value)] = '\0';
806  }
807
808
809  char * status_passwd;
810  map * m_status_passwd = getMapFromMaps (conf, "status", "passwd");
811  if (m_status_passwd == NULL){
812    fprintf(stderr,"Configuration error: [status] passwd");
813    return 2;
814  }
815  else {
816    status_passwd = (char *)malloc((strlen(m_status_passwd->value) +1)*sizeof(char*));
817    strncpy(status_passwd,m_status_passwd->value,strlen(m_status_passwd->value));
818    status_passwd[strlen(m_status_passwd->value)] = '\0';
819  }
820
821  char * status_bdd;
822  map * m_status_bdd = getMapFromMaps (conf, "status", "bdd");
823  if (m_status_bdd == NULL){
824    fprintf(stderr,"Configuration error: [status] bdd");
825    return 2;
826  }
827  else {
828    status_bdd = (char *)malloc((strlen(m_status_bdd->value) +1)*sizeof(char*));
829    strncpy(status_bdd,m_status_bdd->value,strlen(m_status_bdd->value));
830    status_bdd[strlen(m_status_bdd->value)] = '\0';
831  }
832
833  char * status_host;
834  map * m_status_host = getMapFromMaps (conf, "status", "host");
835  if (m_status_host == NULL){
836    fprintf(stderr,"Configuration error: [status] host");
837    return 2;
838  }
839  else {
840    status_host = (char *)malloc((strlen(m_status_host->value) +1)*sizeof(char*));
841    strncpy(status_host,m_status_host->value,strlen(m_status_host->value));
842    status_host[strlen(m_status_host->value)] = '\0';
843  }
844
845  int status_port;
846  map *m_status_port = getMapFromMaps (conf, "status", "port");
847  if (m_status_port == NULL){
848    fprintf(stderr,"Configuration error: [status] port");
849    return 2;
850  }
851  else {
852    status_port = atoi(m_status_port->value);
853    if (status_port == 0){
854        fprintf(stderr,"Configuration error: [status] port");
855        return 2;
856    }
857  }
858  init_sql(status_host,status_user,status_passwd,status_bdd,status_port);
859
860  int sock = FCGX_OpenSocket(listen, listen_queue); 
861  init_services_conf (rootDir);
862 
863  ret = chown(listen, listen_owner, listen_group); 
864  if (ret != 0){
865    fprintf(stderr,"Change owner error on : %s\n",listen);
866    return 3;
867  }
868 
869  ret = setgid(id_group);
870  if (ret != 0){
871    fprintf(stderr,"Change gid error\n");
872    return 3;
873  }
874
875  ret = setuid(id_user);
876  if (ret != 0){
877    fprintf(stderr,"Change uid error\n");
878    return 3;
879  }
880
881  init_amqp(amqp_host,amqp_port,amqp_user, amqp_passwd, amqp_exchange, amqp_routingkey,amqp_queue);
882
883
884  int fork_status = fork();
885  if (fork_status == 0){
886    //child
887    int master_sync= getpid();
888    fprintf(stderr,"Master sync%d\n",getpid());
889    FCGX_Init();
890    FCGX_Request request;
891    FCGX_InitRequest(&request, sock, 0);
892    int i;
893    int count_request = 0;
894    for (i = 0; i< req_worker; i++){
895        fork_status = fork();
896        if (fork_status == 0){
897            fprintf(stderr,"child sync %d \n",getpid());
898            fflush(stderr);
899            break;
900        }
901    }
902    while(1){
903        /* mode synchrone */
904        if (master_sync != getpid()){
905            while(FCGX_Accept_r(&request) == 0){
906                process(&request);
907                count_request ++;
908                if (count_request >= max_requests){
909                    fprintf(stderr,"Max request stop process\n");
910                    fflush(stderr);
911                    exit(0);
912                }
913            }
914        }
915        else {
916            wait(0);
917            fprintf(stderr,"Master sync %d\n",getpid());
918            fprintf(stderr,"New sync Child\n");
919            fflush(stderr);
920            fork();
921        }
922    }
923  }
924  else {
925   int master_async = getpid();
926   fprintf(stderr,"Master async %d\n",master_async);
927    int fork_s;
928    int j;
929    for (j = 0; j< async_worker; j++){
930        fork_s = fork();
931        if (fork_s == 0){
932            fprintf(stderr,"child async %d \n",getpid());
933            fflush(stderr);
934            break;
935        }
936    }
937    json_object *msg_obj;
938    json_object *maps_obj;
939    maps * map_c;
940    json_object *req_format_jobj;
941    maps * request_input_real_format;
942    json_object *req_jobj;
943    map * request_inputs;
944    json_object *outputs_jobj;
945    maps * request_output_real_format;
946
947    char *msg;
948    int c;
949    int eres;
950    char * service_identifier;
951    service * s1 = NULL;
952    while(1){
953        /* mode asynchrone */
954        if( master_async != getpid()){
955            /*traitement des requetes de la queue */
956            bind_amqp();
957            init_consumer();
958            while(1){
959               
960                c = consumer_amqp(&msg);
961                if (c == 0)
962                    break;
963                msg_obj = json_tokener_parse(msg);
964               
965                free(msg);
966                maps_obj = json_object_object_get(msg_obj,"maps");
967
968                map_c = jsontomaps(maps_obj);
969
970                req_format_jobj = json_object_object_get(msg_obj,"request_input_real_format");
971                request_input_real_format = jsontomaps(req_format_jobj);
972
973                req_jobj = json_object_object_get(msg_obj,"request_inputs");
974                request_inputs = jsontomap(req_jobj);
975
976                outputs_jobj = json_object_object_get(msg_obj,"request_output_real_format");
977                request_output_real_format = jsontomaps(outputs_jobj);
978               
979                json_object_put(msg_obj);
980
981                /* traitemement du message */
982                /* Recherche des references */
983                maps* tmp=request_input_real_format;
984                HINTERNET hInternet = InternetOpen ((LPCTSTR) "ZooWPSClient\0",INTERNET_OPEN_TYPE_PRECONFIG, NULL, NULL, 0);
985                while(tmp!=NULL){
986                    map * tmp_map = getMap(tmp->content,"xlink:href");
987                    if (tmp_map != NULL){
988                        if (loadRemoteFile(&map_c, &tmp_map, &hInternet,tmp_map->value) < 0) {
989                            /* passer le status failed dans la base de donnée */
990                            fprintf(stderr,"Erreur de chargement \n");
991                        }
992                    }
993                    tmp=tmp->next;
994                }
995                runHttpRequests (&map_c, &request_input_real_format, &hInternet);
996                InternetCloseHandle (&hInternet);
997                free(tmp); 
998                map * uuid = getMapFromMaps(map_c,"lenv","usid");
999                if (uuid != NULL)
1000                    start_job(uuid->value);
1001                map *t=createMap("background","1");
1002                maps * lenv = getMaps(map_c,"lenv");
1003                addMapToMap(&lenv->content,t);
1004                freeMap(&t);
1005                free(t);
1006               
1007                map * m_identifier = getMap (request_inputs, "Identifier");
1008               
1009                service_identifier = zStrdup (m_identifier->value);
1010
1011                s1 = search_service (service_identifier);
1012                free(service_identifier);
1013               
1014
1015                //dumpMaps(request_input_real_format);
1016
1017                loadServiceAndRun(&map_c, s1,request_inputs,&request_input_real_format, &request_output_real_format, &eres,NULL,NULL);
1018                if (eres == SERVICE_SUCCEEDED) {
1019                    outputResponse (s1,request_input_real_format,request_output_real_format,request_inputs, 0, map_c, eres,NULL,NULL);
1020                }
1021                   
1022                   
1023                //dumpMaps(request_output_real_format);
1024                //fprintf(stderr,"################################################################\n");
1025                //dumpMaps(map_c);
1026
1027                outputResponse (s1,request_input_real_format,request_output_real_format,request_inputs, 0, map_c, eres,NULL,NULL);
1028
1029               
1030                freeMaps(&map_c);
1031                map_c= NULL;
1032               
1033                freeMaps(&request_input_real_format);
1034                request_input_real_format = NULL;
1035
1036                //dumpMap(request_inputs);
1037                freeMap(&request_inputs);
1038                request_inputs = NULL;
1039               
1040                //dumpMaps(request_output_real_format);
1041                freeMaps(&request_output_real_format);
1042                request_output_real_format = NULL;
1043                consumer_ack_amqp(c);
1044               
1045
1046            }
1047            close_amqp();
1048           
1049
1050
1051
1052           
1053        }
1054        else {
1055            wait(0);
1056            fprintf(stderr,"Master async %d\n",getpid());
1057            fprintf(stderr,"New async Child\n");
1058            fflush(stderr);
1059            fork();
1060        }
1061    }
1062
1063  }
1064 
1065  return 0;
1066}
1067 
Note: See TracBrowser for help on using the repository browser.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png