source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 857

Last change on this file since 857 was 857, checked in by djay, 6 years ago

Send same informations used for inputs for outputs.

  • Property svn:keywords set to Id
File size: 24.0 KB
RevLine 
[845]1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
[854]32#include "sqlapi.h"
[851]33#include <pthread.h>
[854]34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
[845]38
[854]39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
[845]44#ifdef __cplusplus
45extern "C" {
46#endif
47
[854]48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
[851]62  int nbThreads=0;
[854]63  /**
64   * Current step
65   */
[851]66  int cStep=0;
[854]67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
[851]74  pthread_t* myThreads=NULL;
[854]75  /**
76   * Steps array
77   */
[851]78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
[854]87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
[851]91 
[845]92  /**
[851]93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
[850]95   *
[851]96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
[850]99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
[851]113
114
115  /**
116   * Practically invoke the callback, meaning sending the HTTP POST request.
117   *
118   * @param args local_params containing all the variables required
119   */
120  void* _invokeCallback(void* args){
121    local_params* arg=(local_params*)args;
122    HINTERNET hInternet,res1;
123    hInternet=InternetOpen("ZooWPSClient\0",
124                           INTERNET_OPEN_TYPE_PRECONFIG,
125                           NULL,NULL, 0);
126    if(!CHECK_INET_HANDLE(hInternet)){
127      InternetCloseHandle (&hInternet);
128      return false;
129    }
130    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
131    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
132    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
133    hInternet.waitingRequests[0] = zStrdup(URL);
134    free(URL);
[854]135#ifdef CALLBACK_DEBUG
136    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
[851]137    fprintf(stderr," * JSON: [%s] \n",jsonStr);
138    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
139    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
[854]140#endif
141    while( (arg->step!=7 || isOngoing>0) &&
[851]142           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
143           ){
[854]144      struct timespec tv;
145      tv.tv_sec = 0;
146      tv.tv_nsec = (long) 5*1e+9;
147      nanosleep(&tv, &tv);
148      //sleep(1);
[851]149    }
[854]150    isOngoing=1;
151#ifdef CALLBACK_DEBUG
[851]152    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
153    int i=0;
154    for(i=0;i<7;i++){
155      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
156    }
[854]157#endif
158    const struct tm *tm;
159    size_t len;
160    time_t now;
161    char *tmp1;
162    map *tmpStatus;
163   
164    now = time ( NULL );
165    tm = localtime ( &now );
166   
167    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
168    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
169
170#ifdef CALLBACK_DEBUG   
171    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
[851]172    fflush(stderr);
[854]173#endif   
174    free(tmp1);
[851]175    res1 = InternetOpenUrl (&hInternet,
176                            hInternet.waitingRequests[0], 
177                            (char*)jsonStr, strlen(jsonStr),
178                            INTERNET_FLAG_NO_CACHE_WRITE,
179                            0);
180    AddHeaderEntries(&hInternet,arg->conf);
181    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
182    processDownloads(&hInternet);
[854]183    now = time ( NULL );
184    tm = localtime ( &now );
185    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
186    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
187   
188#ifdef CALLBACK_DEBUG   
189    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
190#endif   
191    free(tmp1);
[851]192    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
193                                 * sizeof (char));
194    if (tmp == NULL)
195      {
196        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
197        setMapInMaps(arg->conf,"lenv","code","InternalError");
198        return NULL;
199      }
200    size_t bRead;
201    InternetReadFile (hInternet.ihandle[0],
202                      (LPVOID) tmp,
203                      hInternet.
204                      ihandle[0].nDataLen,
205                      &bRead);
206    tmp[hInternet.ihandle[0].nDataLen] = 0;
207    json_object_put(arg->res);
208    InternetCloseHandle(&hInternet);
[854]209    isOngoing=0;
[851]210    if(cStep==0 || cStep==6 || arg->state==1)
211      cStep=arg->step+1;
[854]212#ifdef CALLBACK_DEBUG   
[851]213    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__);
214    for(i=0;i<7;i++){
215      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
216    }
217    fprintf(stderr,"Result: \n%s\n\n",tmp);
218    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
219    fflush(stderr);
[854]220#endif
221    steps[arg->step][arg->state]=true;
[851]222    free(tmp);
[854]223    //free(args);
224    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
225    fflush(stderr);
[851]226    pthread_exit(NULL);
227  }
[850]228 
229  /**
[845]230   * Invoke the callback in case there is a [callback] section containing a url parameter
231   *
232   * @param m the maps containing the main configuration file definitions
233   * @param inputs the inputs defined in the request (can be null if not yet initialized)
234   * @param inputs the outputs provided in the request (can be null if not yet initialized)
235   * @param step the step number, steps are defined as:
236   *  0: Analyze creation
237   *  1: Fetching Data Inputs
238   *  2: Uploading data inputs to cluster
239   *  3: Creating Job Script
240   *  4: Submitting Job to Cluster
241   *  5: Downloading processed output from cluster
242   *  6: Finalize
243   *  7: Dismiss or Error
244   * @param state 0 in case the step starts, 1 when it ends
245   * @return bool true in case of success, false in other cases
246   */
[851]247  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
248    map* url=getMapFromMaps(conf,"callback","url");
[845]249    if(url==NULL)
250      return false;
[850]251     
[851]252    maps* lenv=getMaps(conf,"lenv");
[850]253    map* sname=getMap(lenv->content,"identifier");
[851]254    if(sname!=NULL && isProhibited(conf,sname->value))
[850]255      return false;
256     
[845]257    json_object *res=json_object_new_object();
258
[851]259    map* sid=getMapFromMaps(conf,"lenv","usid");
[845]260    if(sid!=NULL){
261      json_object *jsStr=json_object_new_string(sid->value);
262      json_object_object_add(res,"jobid",jsStr);
263    }
[851]264    const struct tm *tm;
265    size_t len;
266    time_t now;
267    char *tmp1;
268    map *tmpStatus;
269 
270    now = time ( NULL );
271    tm = localtime ( &now );
272
273    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
[854]274    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
[851]275    json_object *jsStr0=json_object_new_string(tmp1);
276    json_object_object_add(res,"datetime",jsStr0);
[854]277    free(tmp1);
278   
[845]279    switch(step){
280    case 0: {
281      // Create a new analyze
[851]282      maps* lenv=getMaps(conf,"lenv");
[854]283      sid=getMapFromMaps(conf,"renv","xrequest");
[845]284      if(sid!=NULL){
285        json_object *jsStr=json_object_new_string(sid->value);
286        json_object_object_add(res,"request_execute_content",jsStr);
287      }
[851]288      sid=getMapFromMaps(conf,"lenv","identifier");
[845]289      if(sid!=NULL){
290        json_object *jsStr=json_object_new_string(sid->value);
291        json_object_object_add(res,"process_identifier",jsStr);
292      }
[854]293      // Save the Execute request on disk
294      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
295      map* req=getMapFromMaps(conf,"renv","xrequest");
296      sid=getMapFromMaps(conf,"lenv","usid");
297      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
298      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
299      FILE* saveExecute=fopen(executePath,"wb");
300      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
301      fflush(saveExecute);
302      fclose(saveExecute);
303      setMapInMaps(conf,"lenv","execute_file",executePath);
304      free(executePath);
[845]305      break;
306    }
[854]307     
[845]308    case 1: {
309      // Fetching data inputs
310      maps* curs=inputs;
[854]311      dumpMaps(curs);
312      char *keys[10][2]={
[845]313        {
314          "href",
315          "ref"
316        },
317        {
318          "cache_file",
319          "cachefile"
320        },
321        {
322          "fmimeType",
323          "mimetype"
324        },
325        {
326          "size",
327          "size"
[850]328        },
329        {
330          "ref_wms_link",
331          "ref_wms_link"
332        },
333        {
334          "ref_wcs_link",
335          "ref_wcs_link"
336        },
337        {
[854]338          "ref_wcs_link",
339          "ref_wcs_link"
[850]340        },
341        {
[854]342          "ref_wcs_preview_link",
343          "ref_wcs_preview_link"
344        },
345        {
[851]346          "geodatatype",
[850]347          "datatype"
[854]348        },
349        {
350          "wcs_extent",
351          "boundingbox"
[850]352        }       
[845]353      };
354      json_object *res1=json_object_new_object();
355      while(curs!=NULL){
356        map* tmpMap=getMap(curs->content,"cache_file");
[850]357        sid=getMap(curs->content,"ref_wms_link");
[851]358        json_object *res2=json_object_new_object();
[854]359        if(tmpMap!=NULL){
360          if(sid==NULL){
361            addToMap(curs->content,"generated_file",tmpMap->value);
362            addToMap(curs->content,"storage",tmpMap->value);
363          }
364          fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
365          dumpMap(curs->content);
[851]366          struct stat buf;
367          char timeStr[ 100 ] = "";
368          if (stat(tmpMap->value, &buf)==0){
369            strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
370            json_object *jsStr=json_object_new_string(timeStr);
371            json_object_object_add(res2,"creation_date",jsStr);
372          }
[845]373          tmpMap=getMap(curs->content,"fmimeType");
374          if(tmpMap!=NULL){
375            addToMap(curs->content,"mimeType",tmpMap->value);
376          }
[851]377          setReferenceUrl(conf,curs);
[845]378        }
379        int i=0;
380        int hasRef=-1;
[854]381        for(;i<10;i++){
[845]382          sid=getMap(curs->content,keys[i][0]);
383          if(sid!=NULL){
384            json_object *jsStr=json_object_new_string(sid->value);
385            json_object_object_add(res2,keys[i][1],jsStr);
[851]386            if(i==0){
[845]387              hasRef=1;
[851]388              json_object *jsStr1=json_object_new_string(getProvenance(conf,url->value));
389              json_object_object_add(res2,"dataOrigin",jsStr1);
390            }
[845]391          }
392        }
393        if(hasRef<0)
394          json_object_put(res2);
395        else
396          json_object_object_add(res1,curs->name,res2);
397        curs=curs->next;
398      }
399      json_object_object_add(res,"inputs",res1);
400      break;
401    }
[854]402     
[845]403    case 2: {
[854]404      // Update the execute request stored on disk at step 1,1 to modify the references used.
405      if(state==0){
406        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
407        fflush(stderr);
408        maps* curs=inputs;
409        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
410        fflush(stderr);
411        xmlInitParser();
412        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
413        fflush(stderr);
414        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
415        dumpMap(xmlPath);
416        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
417        fflush(stderr);
418        while(curs!=NULL){
419          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
420          fflush(stderr);
421          dumpMap(curs->content);
422          //map* bvMap=getMap(curs->content,"byValue");
423          // TODO handle mapArray
424          //if(bvMap!=NULL && strncasecmp(bvMap->value,"true",4)==0){
425          if(getMap(curs->content,"href")==NULL && getMap(curs->content,"mimeType")!=NULL){
426            map* tmpMap=getMap(curs->content,"cache_file");
427            addToMap(curs->content,"generated_file",tmpMap->value);
428            addToMap(curs->content,"storage",tmpMap->value);
429            setReferenceUrl(conf,curs);
430            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
431            fflush(stderr);
432            dumpMap(curs->content);
433            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
434            fflush(stderr);
435            const char *params[5];
436            int xmlLoadExtDtdDefaultValue;
437            int hasFile=-1;
438            map* xslPath=getMapFromMaps(conf,"callback","template");
439            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
440            fflush(stderr);
441            dumpMap(xslPath);
442            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
443            fflush(stderr);
444            map* filePath=getMap(curs->content,"ref_wfs_link");
445            if(filePath==NULL)
446              filePath=getMap(curs->content,"ref_wcs_link");
447            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
448            fflush(stderr);
449            dumpMap(filePath);
450            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
451            fflush(stderr);
452            char* inputName=curs->name;
453            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
454            fflush(stderr);
455            if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
456              break;
457            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
458            fflush(stderr);
459            char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
460            char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
461            sprintf(tmpParam,"string(\"%s\")",curs->name);         
462            sprintf(tmpParam1,"string(\"%s\")",filePath->value);           
463            params[0]="attr";
464            params[1]=tmpParam;
465            params[2]="value";
466            params[3]=tmpParam1;//filePath->value;
467            params[4]=NULL;
468            fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s \n",
469                    tmpParam,tmpParam1);
470            xmlSubstituteEntitiesDefault(1);
471            fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
472            fflush(stderr);
473            xmlLoadExtDtdDefaultValue = 0;
474            xsltStylesheetPtr cur = NULL;
475            xmlDocPtr doc, res;
476            cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
477            doc = xmlParseFile(xmlPath->value);
478            fflush(stderr);
479            res = xsltApplyStylesheet(cur, doc, params);
480            xmlChar *xmlbuff;
481            int buffersize;
482            xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
483            // Store the executeRequest in file again
484            free(tmpParam);
485            free(tmpParam1);
486            fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
487            fflush(stderr);
488            FILE* saveExecute=fopen(xmlPath->value,"wb");
489            fwrite(xmlbuff,1,buffersize,saveExecute);
490            fflush(saveExecute);
491            fclose(saveExecute);
492            xmlFree(xmlbuff);
493            xmlFreeDoc(doc);
494            xsltFreeStylesheet(cur);
495          }
496          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
497          fflush(stderr);
498          curs=curs->next;
499        }
500        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
501        fflush(stderr);
502        xmlCleanupParser();
503        fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
504        fflush(stderr);
505        FILE* f0=fopen(xmlPath->value,"rb");
506        if(f0!=NULL){
507          long flen;
508          char *fcontent;
509          fseek (f0, 0, SEEK_END);
510          flen = ftell (f0);
511          fseek (f0, 0, SEEK_SET);
512          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
513          fread(fcontent,flen,1,f0);
514          fcontent[flen]=0;
515          fclose(f0);
516          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
517          fflush(stderr);
518          map *schema=getMapFromMaps(conf,"database","schema");
519          map* sid=getMapFromMaps(conf,"lenv","usid");
520          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
521          fflush(stderr);
522          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
523          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
524          fflush(stderr);
525          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
526          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
527          fflush(stderr);
528          execSql(conf,1,req);
529          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
530          fflush(stderr);
531          free(fcontent);
532          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
533          fflush(stderr);
534          free(req);
535          fprintf(stderr,"%s %d \n",__FILE__,__LINE__);
536          fflush(stderr);
537        }
538      }
539
[845]540      // Uploading data input to cluster
[851]541      maps* in=getMaps(conf,"uploadQueue");
[845]542      if(in!=NULL){
543        maps* curs=in;
544        map* length=getMapFromMaps(in,"uploadQueue","length");
545        if(length!=NULL){
546          json_object *res1=json_object_new_object();
547          json_object *res2=json_object_new_object();
548          int limit=atoi(length->value);
549          int i=0;
550          maps* uploadQueue=getMaps(in,"uploadQueue");
551          map* tmp=uploadQueue->content;
552          for(;i<limit;i++){
553            map* tmp0=getMapArray(tmp,"input",i);
554            map* tmp1=getMapArray(tmp,"localPath",i);
555            map* tmp2=getMapArray(tmp,"targetPath",i);
556            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
557              json_object *jsStr=json_object_new_string(tmp1->value);
558              json_object_object_add(res2,"local_path",jsStr);
559              jsStr=json_object_new_string(tmp2->value);
560              json_object_object_add(res2,"target_path",jsStr);
561              json_object_object_add(res1,tmp0->value,res2);
562            }
563          }
564          json_object_object_add(res,"inputs",res1);
565        }
566        //json_object_object_add(res,"inputs",in);
567      }
568      break;
569    }
[854]570     
[845]571    case 3: {
572      // Generating job script
[851]573      sid=getMapFromMaps(conf,"lenv","local_script");
[845]574      if(sid!=NULL){
575        json_object *jsStr=json_object_new_string(sid->value);
576        json_object_object_add(res,"script",jsStr);
577      }
578      break;
579    }
[854]580     
[845]581    case 4: {
582      // Submitting job to cluster
[851]583      sid=getMapFromMaps(conf,"lenv","remote_script");
[845]584      if(sid!=NULL){
585        json_object *jsStr=json_object_new_string(sid->value);
586        json_object_object_add(res,"script",jsStr);
587      }
588      break;
589    }
[854]590     
[845]591    case 5: {
592      // Downloading process outputs from cluster
[851]593      //json_object* in=mapsToJson(outputs);
594      maps* curs=outputs;
[854]595      dumpMaps(curs);
596      char *keys[10][2]={
[851]597        {
598          "Reference",
599          "ref"
600        },
601        {
[854]602          "generated_file",
[851]603          "cachefile"
604        },
605        {
[854]606          "mimeType",
[851]607          "mimetype"
608        },
609        {
610          "size",
611          "size"
612        },
613        {
[854]614          "geodatatype",
615          "datatype"
616        },
617        {
[857]618          "wcs_extent",
[854]619          "boundingbox"
620        },
621        {
[851]622          "ref_wms_link",
623          "ref_wms_link"
624        },
625        {
626          "ref_wcs_link",
627          "ref_wcs_link"
628        },
629        {
[857]630          "ref_wcs_preview_link",
[854]631          "ref_wcs_preview_link"
632        },
633        {
[851]634          "ref_wfs_link",
635          "ref_wfs_link"
[854]636        }       
637      };
638      char* specifics[5][2]={
639        {
640          "download_link",
641          "ref_download_link"
[851]642        },
643        {
[854]644          "wms_link",
645          "ref_wms_link"
646        },
647        {
648          "wfs_link",
649          "ref_wfs_link"
650        },
651        {
652          "wcs_link",
653          "ref_wcs_link"
654        },
655        {
656          "wcs_link",
657          "ref_wcs_preview_link"
658        }
[851]659      };
660      json_object *res1=json_object_new_object();
661      while(curs!=NULL){       
662        json_object *res2=json_object_new_object();
663        int i=0;
664        int hasRef=-1;
[854]665        for(;i<10;i++){
[851]666          sid=getMap(curs->content,keys[i][0]);
667          if(sid!=NULL){
668            json_object *jsStr=json_object_new_string(sid->value);
669            json_object_object_add(res2,keys[i][1],jsStr);
670            if(i==0)
671              hasRef=1;
672          }
673        }
674        if(hasRef>0)
675          json_object_object_add(res1,curs->name,res2);
676        else{
677          maps* curs0=curs->child;
[854]678          int i=0;
679          int bypass=-1;
680          for(i=0;i<5;i++){
681            maps* specificMaps;
682            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
683              int hasRef0=-1;
684              int i0=0;
685              for(;i0<6;i0++){
686                sid=getMap(specificMaps->content,keys[i0][0]);
687                if(sid!=NULL){
688                  json_object *jsStr=json_object_new_string(sid->value);
689                  if(i0==0){
690                    json_object_object_add(res2,specifics[i][1],jsStr);
691                  }
692                  else
693                    json_object_object_add(res2,keys[i0][1],jsStr);
694                  hasRef0=1;
695                  bypass=1;
696                  if(i==1){
697                    struct stat buf;
698                    char timeStr[ 100 ] = "";
699                    if (stat(sid->value, &buf)==0){
700                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
701                      json_object *jsStr=json_object_new_string(timeStr);
702                      json_object_object_add(res2,"creation_date",jsStr);
703                    }
704                  }
705                }
706              }       
707            }
708          }
709          if(bypass<0)
710            while(curs0!=NULL){
711              json_object *res3=json_object_new_object();
712              int i0=0;
713              int hasRef0=-1;
714              for(;i0<10;i0++){
715                sid=getMap(curs0->content,keys[i0][0]);
716                if(sid!=NULL){
717                  json_object *jsStr=json_object_new_string(sid->value);
718                  json_object_object_add(res3,keys[i0][1],jsStr);
719                  //if(i0==0)
720                  hasRef0=1;
721                }
[851]722              }
[854]723              if(hasRef0<0)
724                json_object_put(res3);
725              else
726                json_object_object_add(res2,curs0->name,res3);
727              curs0=curs0->next;
[851]728            }
729          json_object_object_add(res1,curs->name,res2);
730        }
731        curs=curs->next;
[845]732      }
[851]733      json_object_object_add(res,"outputs",res1);
[845]734      break;
735    }
[854]736     
[845]737    case 6: {
738      // Finalize HPC
[854]739      char *keys[6][2]={
740        {
741          "SubmitTime",
742          "hpc_submission_date"
743        },
744        {
745          "JobId",
746          "hpc_job_identifier"
747        },
748        {
749          "JobName",
750          "hpc_job_name"
751        },
752        {
753          "StartTime",
754          "hpc_start_date"
755        },
756        {
757          "EndTime",
758          "hpc_end_date"
759        },
760        {
761          "JobState",
762          "hpc_status"
763        }       
764      };
765      int i=0;
766      if(getMaps(conf,"henv")!=NULL){
767        for(i=0;i<6;i++){
768          sid=getMapFromMaps(conf,"henv",keys[i][0]);
769          if(sid!=NULL){
770            json_object *jsStr=json_object_new_string(sid->value);
771            json_object_object_add(res,keys[i][1],jsStr);
772          }
773        }
[845]774      }
[854]775      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
776        json_object *jsStr=json_object_new_string(sid->value);
777        json_object_object_add(res,"hpc_cpu_usage",jsStr);
778      }else{
779        json_object *jsStr=json_object_new_string("1");
780        json_object_object_add(res,"hpc_cpu_usage",jsStr);
781      }
782      json_object *jsStr=json_object_new_string("succeeded");
783      json_object_object_add(res,"wps_status",jsStr);
[845]784      break;
785    }
[854]786     
[845]787    case 7: {
788      // Error or Dismiss
[851]789      sid=getMapFromMaps(conf,"lenv","message");
[845]790      if(sid!=NULL){
791        json_object *jsStr=json_object_new_string(sid->value);
792        json_object_object_add(res,"message",jsStr);
793      }
[854]794      json_object *jsStr;
795      if(state==1)
796        jsStr=json_object_new_string("dismissed");
797      else
798        jsStr=json_object_new_string("failed");
[845]799      json_object_object_add(res,"wps_status",jsStr);
800      break;
801    }
802    others: {
803        break;
804      }
805    }
[854]806
807    if(local_arguments==NULL)
808      local_arguments=(local_params**)malloc(sizeof(local_params*));
809    else
810      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
811    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
812    local_arguments[nbThreads]->conf=conf;
813    local_arguments[nbThreads]->url=url;
814    local_arguments[nbThreads]->res=res;
815    local_arguments[nbThreads]->step=step;
816    local_arguments[nbThreads]->state=state;
[851]817    //pthread_t p1;
818    if(myThreads==NULL)
819      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
820    else
821      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
[854]822    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
[851]823      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
[845]824      return false;
825    }
[851]826    //free(argumentsA);
827    nbThreads++;
[845]828    return true;
829  }
830
[854]831  /**
832   * Wait for the threads to end then, clean used memory.
833   */
[851]834  void cleanupCallbackThreads(){
835    int i=0;
836    for(i=0;i<nbThreads;i++){
[854]837      fprintf(stderr,"%s %d %d \n",__FILE__,__LINE__,i);
838      fflush(stderr);
[851]839      pthread_join(myThreads[i],NULL);
[854]840      free(local_arguments[i]);
[851]841    }
[854]842    free(local_arguments);
[851]843    free(myThreads);
844  }
845
[845]846#ifdef __cplusplus
847}
848#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png