source: branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c @ 862

Last change on this file since 862 was 862, checked in by djay, 6 years ago

Add the capability to publish heatmap or any templated mapfile using the epecific msInclude and msLayer keys for an output. For MapServer? published output, define 4096 as the default maxsize and use pixel width or height for raster files. use the correct MapServer? imagemode depending on GDALGetRasterDataType (MS_IMAGEMODE_BYTE for GDT_Byte, MS_IMAGEMODE_INT16 for GDT_Int16 and MS_IMAGEMODE_FLOAT32 for GDT_Float32). Create a text file (.maps) listing every mapfiles created for a MapServer? published output (or inputs) using saveMapNames function. Fixes in ulinet, use uuid for naming temporary files. Add dialect input to the ogr2ogr service. Use the .maps file for removing a file from the DeleteData? service

  • Property svn:keywords set to Id
File size: 25.2 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 *  Copyright 2017 GeoLabs SARL. All rights reserved.
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29#include "service_callback.h"
30#include "service_json.h"
31#include "service_internal_ms.h"
32#include "sqlapi.h"
33#include <pthread.h>
34#include <libxml/tree.h>
35#include <libxml/parser.h>
36#include <libxml/xpath.h>
37#include <libxml/xpathInternals.h>
38
39#include <libxslt/xslt.h>
40#include <libxslt/xsltInternals.h>
41#include <libxslt/transform.h>
42#include <libxslt/xsltutils.h>
43
44#ifdef __cplusplus
45extern "C" {
46#endif
47
48  /**
49   * Parameter definition to be used for sending parameters to a thread.
50   */
51  typedef struct {
52    maps *conf;      //!< the main configuration file
53    map *url;        //!< the callback url maps
54    json_object *res;//!< the JSON object to post
55    int step;        //!< the current step [0,6]
56    int state;       //!< the current state [0,1]
57  } local_params;
58
59  /**
60   * Number of threads
61   */
62  int nbThreads=0;
63  /**
64   * Current step
65   */
66  int cStep=0;
67  /**
68   * Is there any ongoing HTTP request
69   */
70  int isOngoing=0;
71  /**
72   * Threads array
73   */
74  pthread_t* myThreads=NULL;
75  /**
76   * Steps array
77   */
78  bool steps[7][2]={
79    {false,false},
80    {false,false},
81    {false,false},
82    {false,false},
83    {false,false},
84    {false,false},
85    {false,false}
86  };
87  /**
88   * Arguments array to give to the _invokeCallback thread's function
89   */
90  local_params** local_arguments;
91 
92  /**
93   * Check if a service name is prohibited, meaning that the Kernel doesn't have
94   * to invoke the callback for this specific service.
95   *
96   * @param conf the main configuration file maps
97   * @param serviceName the serviceName
98   * @return a bool true if the service is prohibited, false in other case
99   */
100  bool isProhibited(maps* conf,const char* serviceName){
101    map* plist=getMapFromMaps(conf,"callback","prohibited");
102    if(plist!=NULL){
103      char *tmp=plist->value;
104      char *tmpS=strtok(tmp,",");
105      while(tmpS!=NULL){
106        if(strcmp(serviceName,tmpS)==0)
107          return true;
108        tmpS=strtok(NULL,",");
109      }
110    }
111    return false;
112  }
113
114  /**
115   * Practically invoke the callback, meaning sending the HTTP POST request.
116   *
117   * @param args local_params containing all the variables required
118   */
119  void* _invokeCallback(void* args){
120    local_params* arg=(local_params*)args;
121    HINTERNET hInternet,res1;
122    const struct tm *tm;
123    size_t len;
124    time_t now;
125    char *tmp1;
126    map *tmpStatus;
127    hInternet=InternetOpen("ZooWPSClient\0",
128                           INTERNET_OPEN_TYPE_PRECONFIG,
129                           NULL,NULL, 0);
130    if(!CHECK_INET_HANDLE(hInternet)){
131      InternetCloseHandle (&hInternet);
132      return false;
133    }
134    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
135    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
136    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
137    hInternet.waitingRequests[0] = zStrdup(URL);
138    free(URL);
139#ifdef CALLBACK_DEBUG
140    now = time ( NULL );
141    tm = localtime ( &now );
142    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
143    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
144    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS cStep %d %d\n",pthread_self(),__FILE__,__LINE__,cStep,isOngoing);
145    fprintf(stderr," * JSON: [%s] \n",jsonStr);
146    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
147    fprintf(stderr," * DATE: %s/ \n\n",tmp1);
148    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
149    free(tmp1);
150#endif
151    while( (arg->step!=7 || isOngoing>0) &&
152           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
153           ){
154      zSleep(100);
155    }
156    isOngoing=1;
157#ifdef CALLBACK_DEBUG
158    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
159    int i=0;
160    for(i=0;i<7;i++){
161      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
162    }
163#endif
164   
165    now = time ( NULL );
166    tm = localtime ( &now );
167   
168    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
169    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
170
171#ifdef CALLBACK_DEBUG   
172    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START (%s)\n",pthread_self(),__FILE__,__LINE__,tmp1);
173    fflush(stderr);
174#endif   
175    free(tmp1);
176    res1 = InternetOpenUrl (&hInternet,
177                            hInternet.waitingRequests[0], 
178                            (char*)jsonStr, strlen(jsonStr),
179                            INTERNET_FLAG_NO_CACHE_WRITE,
180                            0);
181    AddHeaderEntries(&hInternet,arg->conf);
182    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
183    processDownloads(&hInternet);
184    now = time ( NULL );
185    tm = localtime ( &now );
186    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
187    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
188   
189#ifdef CALLBACK_DEBUG   
190    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END (%s)\n\n",pthread_self(),__FILE__,__LINE__,tmp1);
191#endif
192    free(tmp1);
193    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
194                                 * sizeof (char));
195    if (tmp == NULL)
196      {
197        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
198        setMapInMaps(arg->conf,"lenv","code","InternalError");
199        return NULL;
200      }
201    size_t bRead;
202    InternetReadFile (hInternet.ihandle[0],
203                      (LPVOID) tmp,
204                      hInternet.
205                      ihandle[0].nDataLen,
206                      &bRead);
207    tmp[hInternet.ihandle[0].nDataLen] = 0;
208    json_object_put(arg->res);
209    InternetCloseHandle(&hInternet);
210    isOngoing=0;
211    if(cStep==0 || cStep==6 || arg->state==1)
212      cStep=arg->step+1;
213#ifdef CALLBACK_DEBUG
214    now = time ( NULL );
215    tm = localtime ( &now );
216    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
217    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
218    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT (%s)\n",pthread_self(),__FILE__,__LINE__,,tmp1);
219    for(i=0;i<7;i++){
220      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
221    }
222    fprintf(stderr,"Result: \n%s\n\n",tmp);
223    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
224    fflush(stderr);
225    free(tmp1);
226#endif
227    steps[arg->step][arg->state]=true;
228    free(tmp);
229#ifdef CALLBACK_DEBUG
230    fprintf(stderr,"************************* From thread %d %s %d: EXIT\n\n",pthread_self(),__FILE__,__LINE__);
231    fflush(stderr);
232#endif
233    pthread_exit(NULL);
234  }
235 
236  /**
237   * Invoke the callback in case there is a [callback] section containing a url parameter
238   *
239   * @param m the maps containing the main configuration file definitions
240   * @param inputs the inputs defined in the request (can be null if not yet initialized)
241   * @param inputs the outputs provided in the request (can be null if not yet initialized)
242   * @param step the step number, steps are defined as:
243   *  0: Analyze creation
244   *  1: Fetching Data Inputs
245   *  2: Uploading data inputs to cluster
246   *  3: Creating Job Script
247   *  4: Submitting Job to Cluster
248   *  5: Downloading processed output from cluster
249   *  6: Finalize
250   *  7: Dismiss or Error
251   * @param state 0 in case the step starts, 1 when it ends
252   * @return bool true in case of success, false in other cases
253   */
254  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
255    map* url=getMapFromMaps(conf,"callback","url");
256    if(url==NULL)
257      return false;
258     
259    maps* lenv=getMaps(conf,"lenv");
260    map* sname=getMap(lenv->content,"identifier");
261    if(sname!=NULL && isProhibited(conf,sname->value))
262      return false;
263     
264    json_object *res=json_object_new_object();
265
266    map* sid=getMapFromMaps(conf,"lenv","usid");
267    if(sid!=NULL){
268      json_object *jsStr=json_object_new_string(sid->value);
269      json_object_object_add(res,"jobid",jsStr);
270    }
271    const struct tm *tm;
272    size_t len;
273    time_t now;
274    char *tmp1;
275    map *tmpStatus;
276 
277    now = time ( NULL );
278    tm = localtime ( &now );
279
280    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
281    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%H:%M:%SZ", tm );
282    json_object *jsStr0=json_object_new_string(tmp1);
283    json_object_object_add(res,"datetime",jsStr0);
284    free(tmp1);
285   
286    switch(step){
287    case 0: {
288      // Create a new analyze
289      maps* lenv=getMaps(conf,"lenv");
290      sid=getMapFromMaps(conf,"renv","xrequest");
291      if(sid!=NULL){
292        json_object *jsStr=json_object_new_string(sid->value);
293        json_object_object_add(res,"request_execute_content",jsStr);
294      }
295      sid=getMapFromMaps(conf,"lenv","identifier");
296      if(sid!=NULL){
297        json_object *jsStr=json_object_new_string(sid->value);
298        json_object_object_add(res,"process_identifier",jsStr);
299      }
300      // Save the Execute request on disk
301      map* tmpPath=getMapFromMaps(conf,"main","tmpPath");
302      map* req=getMapFromMaps(conf,"renv","xrequest");
303      sid=getMapFromMaps(conf,"lenv","usid");
304      char* executePath=(char*)malloc((strlen(tmpPath->value)+strlen(sid->value)+14)*sizeof(char));
305      sprintf(executePath,"%s/execute_%s.xml",tmpPath->value,sid->value);
306      FILE* saveExecute=fopen(executePath,"wb");
307      fwrite(req->value,1,strlen(req->value)*sizeof(char),saveExecute);
308      fflush(saveExecute);
309      fclose(saveExecute);
310      setMapInMaps(conf,"lenv","execute_file",executePath);
311      free(executePath);
312      break;
313    }
314     
315    case 1: {
316      // Update the execute request stored on disk at step 0,0 to modify the references used.
317      if(state==1){
318        maps* curs=inputs;
319        xmlInitParser();
320        map* xmlPath=getMapFromMaps(conf,"lenv","execute_file");
321        while(curs!=NULL){
322          map* length=getMap(curs->content,"length");
323          map* useMS=getMap(curs->content,"useMapserver");
324          if(length==NULL){
325            addToMap(curs->content,"length","1");
326            length=getMap(curs->content,"length");
327          }
328          int len=atoi(length->value);
329          for(int ii=0;ii<len;ii++){
330            if(getMapArray(curs->content,"byValue",ii)!=NULL && getMapArray(curs->content,"mimeType",ii)!=NULL && useMS!=NULL && strncasecmp(useMS->value,"true",4)==0){
331              map* tmpMap=getMapArray(curs->content,"value",ii);
332              char tmpStr[100];
333              sprintf(tmpStr,"%d",strlen(tmpMap->value));
334              setMapArray(curs->content,"size",ii,tmpStr);
335              tmpMap=getMapArray(curs->content,"mimeType",ii);
336              setMapArray(curs->content,"fmimeType",ii,tmpMap->value);
337              tmpMap=getMapArray(curs->content,"cache_file",ii);
338              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
339              setMapArray(curs->content,"storage",ii,tmpMap->value);
340              setReferenceUrl(conf,curs);
341              addIntToMap(curs->content,"published_id",ii+1);
342              const char *params[7];
343              int xmlLoadExtDtdDefaultValue;
344              int hasFile=-1;
345              map* xslPath=getMapFromMaps(conf,"callback","template");
346              map* filePath=getMapArray(curs->content,"ref_wfs_link",ii);
347              if(filePath==NULL)
348                filePath=getMap(curs->content,"ref_wcs_link");
349              char* inputName=curs->name;
350              if(xslPath==NULL || xmlPath==NULL || filePath==NULL)
351                break;
352              char *tmpParam=(char*)malloc((strlen(curs->name)+11)*sizeof(char));
353              char *tmpParam1=(char*)malloc((strlen(filePath->value)+11)*sizeof(char));
354              char tmpParam2[16];
355              sprintf(tmpParam2,"string(\"%d\")",ii);
356              setMapArray(curs->content,"href",ii,filePath->value);
357              setMapArray(curs->content,"xlink:href",ii,filePath->value);
358              tmpMap=getMapArray(curs->content,"cache_url",ii);
359              if(tmpMap!=NULL)
360                setMapArray(curs->content,"xlink:href",ii,tmpMap->value);
361              else
362                setMapArray(curs->content,"xlink:href",ii,filePath->value);
363              sprintf(tmpParam,"string(\"%s\")",curs->name);
364              sprintf(tmpParam1,"string(\"%s\")",filePath->value);
365              sprintf(tmpParam2,"string(\"%d\")",ii);
366              params[0]="attr";
367              params[1]=tmpParam;
368              params[2]="value";
369              params[3]=tmpParam1;//filePath->value;
370              params[4]="cnt";
371              params[5]=tmpParam2;
372              params[6]=NULL;
373              fprintf(stderr, "## XSLT PARAMETERS ATTR: %s VALUE: %s INDEX: %s\n",
374                      tmpParam,tmpParam1,tmpParam2);
375              fflush(stderr);
376              xmlSubstituteEntitiesDefault(1);
377              xmlLoadExtDtdDefaultValue = 0;
378              xsltStylesheetPtr cur = NULL;
379              xmlDocPtr doc, res;
380              cur = xsltParseStylesheetFile(BAD_CAST xslPath->value);
381              doc = xmlParseFile(xmlPath->value);
382              fflush(stderr);
383              res = xsltApplyStylesheet(cur, doc, params);
384              xmlChar *xmlbuff;
385              int buffersize;
386              xmlDocDumpFormatMemory(res, &xmlbuff, &buffersize, 1);
387              // Store the executeRequest in file again
388              free(tmpParam);
389              free(tmpParam1);
390              fprintf(stderr," # Request / XSLT: %s\n",xmlbuff);
391              fflush(stderr);
392              FILE* saveExecute=fopen(xmlPath->value,"wb");
393              if(saveExecute!=NULL){
394                fwrite(xmlbuff,1,buffersize,saveExecute);
395                fflush(saveExecute);
396                fclose(saveExecute);
397              }
398              xmlFree(xmlbuff);
399              xmlFreeDoc(doc);
400              xsltFreeStylesheet(cur);
401            }
402          }
403          addIntToMap(curs->content,"published_id",0);
404          curs=curs->next;
405        }
406        xmlCleanupParser();
407        FILE* f0=fopen(xmlPath->value,"rb");
408        if(f0!=NULL){
409          long flen;
410          char *fcontent;
411          fseek (f0, 0, SEEK_END);
412          flen = ftell (f0);
413          fseek (f0, 0, SEEK_SET);
414          fcontent = (char *) malloc ((flen + 1) * sizeof (char));
415          fread(fcontent,flen,1,f0);
416          fcontent[flen]=0;
417          fclose(f0);
418          map *schema=getMapFromMaps(conf,"database","schema");
419          map* sid=getMapFromMaps(conf,"lenv","usid");
420          char *req=(char*)malloc((flen+strlen(schema->value)+strlen(sid->value)+66)*sizeof(char));
421          sprintf(req,"UPDATE %s.services set request_execute_content=$$%s$$ WHERE uuid=$$%s$$",schema->value,fcontent,sid->value);
422          execSql(conf,1,req);
423          free(fcontent);
424          free(req);
425        }
426      }
427
428      // Fetching data inputs
429      maps* curs=inputs;
430      dumpMaps(curs);
431      char *keys[11][2]={
432        {
433          "xlink:href",
434          "ref_download_link"
435        },
436        {
437          "cache_file",
438          "cachefile"
439        },
440        {
441          "fmimeType",
442          "mimetype"
443        },
444        {
445          "size",
446          "size"
447        },
448        {
449          "ref_wms_link",
450          "ref_wms_link"
451        },
452        {
453          "ref_wfs_link",
454          "ref_wfs_link"
455        },
456        {
457          "ref_wcs_link",
458          "ref_wcs_link"
459        },
460        {
461          "ref_wcs_link",
462          "ref_wcs_link"
463        },
464        {
465          "ref_wcs_preview_link",
466          "ref_wcs_preview_link"
467        },
468        {
469          "geodatatype",
470          "datatype"
471        },
472        {
473          "wgs84_extent",
474          "boundingbox"
475        }       
476      };
477      json_object *res1=json_object_new_object();
478      while(curs!=NULL){
479        if(getMap(curs->content,"length")==NULL){
480          addToMap(curs->content,"length","1");
481        }
482        map* length=getMap(curs->content,"length");
483        int len=atoi(length->value);
484        json_object *res3;
485        int hasRef=-1;
486        for(int ii=0;ii<len;ii++){
487          map* tmpMap=getMapArray(curs->content,"cache_file",ii);
488          sid=getMapArray(curs->content,"ref_wms_link",ii);
489          json_object *res2=json_object_new_object();
490          if(tmpMap!=NULL){
491            if(sid==NULL){
492              setMapArray(curs->content,"generated_file",ii,tmpMap->value);
493              setMapArray(curs->content,"storage",ii,tmpMap->value);
494            }
495            struct stat buf;
496            char timeStr[ 100 ] = "";
497            if (stat(tmpMap->value, &buf)==0){
498              strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
499              json_object *jsStr=json_object_new_string(timeStr);
500              json_object_object_add(res2,"creation_date",jsStr);
501            }
502            tmpMap=getMapArray(curs->content,"fmimeType",ii);
503            if(tmpMap!=NULL){
504              setMapArray(curs->content,"mimeType",ii,tmpMap->value);
505            }
506            setReferenceUrl(conf,curs);
507          }else{         
508          }
509          addIntToMap(curs->content,"published_id",ii+1);
510          int i=0;
511          for(;i<11;i++){
512            sid=getMapArray(curs->content,keys[i][0],ii);
513            if(sid!=NULL){
514              json_object *jsStr=json_object_new_string(sid->value);
515              json_object_object_add(res2,keys[i][1],jsStr);
516              if(i==0){
517                hasRef=1;
518                json_object *jsStr1=json_object_new_string(getProvenance(conf,sid->value));
519                json_object_object_add(res2,"dataOrigin",jsStr1);
520              }
521            }
522          }
523          if(len>1){
524            if(ii==0)
525              res3=json_object_new_array();
526            json_object_array_add(res3,res2);
527          }else
528            res3=res2;
529        }
530        if(hasRef<0)
531          json_object_put(res3);
532        else{
533          json_object_object_add(res1,curs->name,json_object_get(res3));
534          json_object_put(res3);
535        }
536        addIntToMap(curs->content,"published_id",0);
537        curs=curs->next;
538      }
539      json_object_object_add(res,"inputs",res1);
540      break;
541    }
542     
543    case 2: {
544      // Uploading data input to cluster
545      maps* in=getMaps(conf,"uploadQueue");
546      if(in!=NULL){
547        maps* curs=in;
548        map* length=getMapFromMaps(in,"uploadQueue","length");
549        if(length!=NULL){
550          json_object *res1=json_object_new_object();
551          int limit=atoi(length->value);
552          int i=0;
553          maps* uploadQueue=getMaps(in,"uploadQueue");
554          map* tmp=uploadQueue->content;
555          for(;i<limit;i++){
556            map* tmp0=getMapArray(tmp,"input",i);
557            map* tmp1=getMapArray(tmp,"localPath",i);
558            map* tmp2=getMapArray(tmp,"targetPath",i);
559            if(tmp0!=NULL && tmp1!=NULL && tmp2!=NULL){
560              json_object *res2=json_object_new_object();
561              json_object *jsStr=json_object_new_string(tmp1->value);
562              json_object_object_add(res2,"local_path",jsStr);
563              jsStr=json_object_new_string(tmp2->value);
564              json_object_object_add(res2,"target_path",jsStr);
565              json_object *res4=json_object_object_get(res1,tmp0->value);
566              if(json_object_is_type(res4,json_type_null)){
567                json_object_object_add(res1,tmp0->value,res2);
568              }else{
569                if(json_object_is_type(res4,json_type_object) && !json_object_is_type(res4, json_type_array)){
570                  json_object *res3=json_object_new_array();
571                  json_object_array_add(res3,json_object_get(res4));
572                  json_object_array_add(res3,res2);
573                  json_object_object_del(res1,tmp0->value);
574                  json_object_object_add(res1,tmp0->value,res3);
575                }else
576                  json_object_array_add(res4,res2);
577              }
578            }
579          }
580          json_object_object_add(res,"inputs",res1);
581        }
582      }
583      break;
584    }
585     
586    case 3: {
587      // Generating job script
588      sid=getMapFromMaps(conf,"lenv","local_script");
589      if(sid!=NULL){
590        json_object *jsStr=json_object_new_string(sid->value);
591        json_object_object_add(res,"script",jsStr);
592      }
593      break;
594    }
595     
596    case 4: {
597      // Submitting job to cluster
598      sid=getMapFromMaps(conf,"lenv","remote_script");
599      if(sid!=NULL){
600        json_object *jsStr=json_object_new_string(sid->value);
601        json_object_object_add(res,"script",jsStr);
602      }
603      break;
604    }
605     
606    case 5: {
607      // Downloading process outputs from cluster
608      maps* curs=outputs;
609      dumpMaps(curs);
610      char *keys[10][2]={
611        {
612          "Reference",
613          "ref"
614        },
615        {
616          "generated_file",
617          "cachefile"
618        },
619        {
620          "mimeType",
621          "mimetype"
622        },
623        {
624          "size",
625          "size"
626        },
627        {
628          "geodatatype",
629          "datatype"
630        },
631        {
632          "wgs84_extent",
633          "boundingbox"
634        },
635        {
636          "ref_wms_link",
637          "ref_wms_link"
638        },
639        {
640          "ref_wcs_link",
641          "ref_wcs_link"
642        },
643        {
644          "ref_wcs_preview_link",
645          "ref_wcs_preview_link"
646        },
647        {
648          "ref_wfs_link",
649          "ref_wfs_link"
650        }       
651      };
652      char* specifics[5][2]={
653        {
654          "download_link",
655          "ref_download_link"
656        },
657        {
658          "wms_link",
659          "ref_wms_link"
660        },
661        {
662          "wfs_link",
663          "ref_wfs_link"
664        },
665        {
666          "wcs_link",
667          "ref_wcs_link"
668        },
669        {
670          "wcs_link",
671          "ref_wcs_preview_link"
672        }
673      };
674      json_object *res1=json_object_new_object();
675      while(curs!=NULL){       
676        json_object *res2=json_object_new_object();
677        int i=0;
678        int hasRef=-1;
679        for(;i<10;i++){
680          sid=getMap(curs->content,keys[i][0]);
681          if(sid!=NULL){
682            json_object *jsStr=json_object_new_string(sid->value);
683            json_object_object_add(res2,keys[i][1],jsStr);
684            if(i==0){
685              hasRef=1;
686              json_object_object_add(res2,"ref_download_link",jsStr);
687            }
688            if(i==1){
689              struct stat buf;
690              char timeStr[ 100 ] = "";
691              if (stat(sid->value, &buf)==0){
692                strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
693                json_object *jsStr=json_object_new_string(timeStr);
694                json_object_object_add(res2,"creation_date",jsStr);
695              }
696            }
697          }
698        }
699        if(hasRef>0)
700          json_object_object_add(res1,curs->name,res2);
701        else{
702          maps* curs0=curs->child;
703          int i=0;
704          int bypass=-1;
705          for(i=0;i<5;i++){
706            maps* specificMaps;
707            if((specificMaps=getMaps(curs0,specifics[i][0]))!=NULL){
708              int hasRef0=-1;
709              int i0=0;
710              for(;i0<6;i0++){
711                sid=getMap(specificMaps->content,keys[i0][0]);
712                if(sid!=NULL){
713                  json_object *jsStr=json_object_new_string(sid->value);
714                  if(i0==0){
715                    json_object_object_add(res2,specifics[i][1],jsStr);
716                  }
717                  else
718                    json_object_object_add(res2,keys[i0][1],jsStr);
719                  hasRef0=1;
720                  bypass=1;
721                  if(i==1){
722                    struct stat buf;
723                    char timeStr[ 100 ] = "";
724                    if (stat(sid->value, &buf)==0){
725                      strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
726                      json_object *jsStr=json_object_new_string(timeStr);
727                      json_object_object_add(res2,"creation_date",jsStr);
728                    }
729                  }
730                }
731              }       
732            }
733          }
734          if(bypass<0)
735            while(curs0!=NULL){
736              json_object *res3=json_object_new_object();
737              int i0=0;
738              int hasRef0=-1;
739              for(;i0<10;i0++){
740                sid=getMap(curs0->content,keys[i0][0]);
741                if(sid!=NULL){
742                  json_object *jsStr=json_object_new_string(sid->value);
743                  json_object_object_add(res3,keys[i0][1],jsStr);
744                  hasRef0=1;
745                }
746              }
747              if(hasRef0<0)
748                json_object_put(res3);
749              else
750                json_object_object_add(res2,curs0->name,res3);
751              curs0=curs0->next;
752            }
753          json_object_object_add(res1,curs->name,res2);
754        }
755        curs=curs->next;
756      }
757      json_object_object_add(res,"outputs",res1);
758      break;
759    }
760     
761    case 6: {
762      // Finalize HPC
763      char *keys[6][2]={
764        {
765          "SubmitTime",
766          "hpc_submission_date"
767        },
768        {
769          "JobId",
770          "hpc_job_identifier"
771        },
772        {
773          "JobName",
774          "hpc_job_name"
775        },
776        {
777          "StartTime",
778          "hpc_start_date"
779        },
780        {
781          "EndTime",
782          "hpc_end_date"
783        },
784        {
785          "JobState",
786          "hpc_status"
787        }       
788      };
789      int i=0;
790      if(getMaps(conf,"henv")!=NULL){
791        for(i=0;i<6;i++){
792          sid=getMapFromMaps(conf,"henv",keys[i][0]);
793          if(sid!=NULL){
794            json_object *jsStr=json_object_new_string(sid->value);
795            json_object_object_add(res,keys[i][1],jsStr);
796          }
797        }
798      }
799      if((sid=getMapFromMaps(conf,"henv","billing_nb_cpu"))!=NULL){
800        json_object *jsStr=json_object_new_string(sid->value);
801        json_object_object_add(res,"hpc_cpu_usage",jsStr);
802      }else{
803        json_object *jsStr=json_object_new_string("1");
804        json_object_object_add(res,"hpc_cpu_usage",jsStr);
805      }
806      json_object *jsStr=json_object_new_string("succeeded");
807      json_object_object_add(res,"wps_status",jsStr);
808      break;
809    }
810     
811    case 7: {
812      // Error or Dismiss
813      sid=getMapFromMaps(conf,"lenv","message");
814      if(sid!=NULL){
815        json_object *jsStr=json_object_new_string(sid->value);
816        json_object_object_add(res,"message",jsStr);
817      }
818      json_object *jsStr;
819      if(state==1)
820        jsStr=json_object_new_string("dismissed");
821      else
822        jsStr=json_object_new_string("failed");
823      json_object_object_add(res,"wps_status",jsStr);
824      break;
825    }
826    others: {
827        break;
828      }
829    }
830
831    if(local_arguments==NULL)
832      local_arguments=(local_params**)malloc(sizeof(local_params*));
833    else
834      local_arguments=(local_params**)realloc(local_arguments,(nbThreads+1)*sizeof(local_params*));
835    local_arguments[nbThreads]=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 
836    local_arguments[nbThreads]->conf=conf;
837    local_arguments[nbThreads]->url=url;
838    local_arguments[nbThreads]->res=res;
839    local_arguments[nbThreads]->step=step;
840    local_arguments[nbThreads]->state=state;
841    if(myThreads==NULL)
842      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
843    else
844      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
845    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)local_arguments[nbThreads])==-1){
846      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
847      return false;
848    }
849    nbThreads++;
850    return true;
851  }
852
853  /**
854   * Wait for the threads to end then, clean used memory.
855   */
856  void cleanupCallbackThreads(){
857    int i=0;
858    for(i=0;i<nbThreads;i++){
859      pthread_join(myThreads[i],NULL);
860      free(local_arguments[i]);
861    }
862    free(local_arguments);
863    free(myThreads);
864  }
865
866#ifdef __cplusplus
867}
868#endif
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png