source: branches/prototype-v0/zoo-project/zoo-kernel/service_internal_hpc.c @ 860

Last change on this file since 860 was 860, checked in by djay, 6 years ago

Add status_code key to the lenv section to support returning a specific HTTP error code from the service code. Fix callback invocation to support inputs arrays at step 1 and 2. Fix issue with cpu usage. Fix issue with mapserver publication when an input is optional. Fix callback invocation at step 7 in case the service has failed on the HPC side.

  • Property svn:keywords set to Id
File size: 36.5 KB
Line 
1/*
2 * Author : Gérald FENOY
3 *
4 * Copyright (c) 2017 GeoLabs SARL
5 *
6 * This work was supported by public funds received in the framework of GEOSUD,
7 * a project (ANR-10-EQPX-20) of the program "Investissements d'Avenir" managed
8 * by the French National Research Agency
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 */
29
30#include "service_internal_hpc.h"
31#include "response_print.h"
32#include "server_internal.h"
33#include "service_callback.h"
34#include "mimetypes.h"
35#include <sys/un.h>
36
37typedef struct {
38  maps* conf;
39  char* local_file;
40  char* target_file;
41} local_params;
42
43/**
44 * Add nested outputs to every outputs that is geographic format
45 * @see isGeographic
46 * @param s the service current definition
47 */ 
48void addNestedOutputs(service** s){
49  if((*s)==NULL){
50    return;
51  }   
52  if(*s==NULL || (*s)->outputs==NULL){
53    return;
54  }
55  elements *out=(*s)->outputs;
56  elements* cur=out;
57  map* serviceType=getMap((*s)->content,"ServiceType");
58  if(strncmp(serviceType->value,"HPC",3)!=0)
59    return;
60  while(cur!=NULL && cur->defaults!=NULL){
61    map* mimeType=getMap(cur->defaults->content,"mimeType");
62    if(mimeType!=NULL){
63      int geo=isGeographic(mimeType->value);
64      if(geo>0){
65        elements *tmp[3]={
66          dupElements(cur),
67          dupElements(cur),
68          dupElements(cur)
69        };
70        char *geoLink="wcs_link";
71        if(geo==2){
72          geoLink="wfs_link";
73        }
74        int i=0;
75        for(;i<3;i++){
76          if(tmp[i]->next!=NULL){
77            freeElements(&tmp[i]->next);
78            free(tmp[i]->next);
79            tmp[i]->next=NULL;
80          }
81          free(tmp[i]->name);
82          if(tmp[i]->format!=NULL)
83            free(tmp[i]->format);
84          tmp[i]->format=zStrdup("ComplexData");
85          freeMap(&tmp[i]->content);
86          free(tmp[i]->content);
87          tmp[i]->content=NULL;
88          switch(i){
89          case 0:
90            tmp[i]->name=zStrdup("download_link");
91            tmp[i]->content=createMap("Title",_("Download link"));
92            addToMap(tmp[i]->content,"Abstract",_("The download link"));
93            addToMap(tmp[i]->defaults->content,"useMapserver","false");
94            if(tmp[i]->supported!=NULL){
95              freeIOType(&tmp[i]->supported);
96              free(tmp[i]->supported);
97              tmp[i]->supported=NULL;
98            }
99            break;
100          case 1:
101            tmp[i]->name=zStrdup("wms_link");
102            tmp[i]->content=createMap("Title",_("WMS link"));
103            addToMap(tmp[i]->content,"Abstract",_("The WMS link"));
104            if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL){
105              freeIOType(&tmp[i]->supported->next);
106              free(tmp[i]->supported->next);
107              tmp[i]->supported->next=NULL;
108            }else{
109              if(tmp[i]->supported!=NULL)
110                addToMap(tmp[i]->supported->content,"useMapserver","true");
111              addToMap(tmp[i]->defaults->content,"useMapserver","true");
112            }
113            break;
114          case 2:
115            if(geo==2){
116              tmp[i]->name=zStrdup("wfs_link");
117              tmp[i]->content=createMap("Title",_("WFS link"));
118              addToMap(tmp[i]->content,"Abstract",_("The WFS link"));
119            }else{
120              tmp[i]->name=zStrdup("wcs_link");
121              tmp[i]->content=createMap("Title",_("WCS link"));
122              addToMap(tmp[i]->content,"Abstract",_("The WCS link"));
123            }
124            if(tmp[i]->supported!=NULL && tmp[i]->supported->next!=NULL &&
125               tmp[i]->supported->next->content!=NULL){
126              freeIOType(&tmp[i]->supported);
127              free(tmp[i]->supported);
128              tmp[i]->supported=NULL;
129              tmp[i]->supported=createIoType();
130              iotype* cnext=cur->supported->next;
131              tmp[i]->supported->content=createMap(cnext->content->name,cnext->content->value);
132              addMapToMap(&tmp[i]->supported->content,cnext->content->next);
133              addToMap(tmp[i]->supported->content,"useMapserver","true");
134            }else
135              addToMap(tmp[i]->defaults->content,"useMapserver","true");
136            break;
137          }
138        }
139        addToElements(&cur->child,tmp[0]);
140        addToElements(&cur->child,tmp[1]);
141        addToElements(&cur->child,tmp[2]);
142        free(cur->format);
143        cur->format=NULL;
144        if(cur->defaults!=NULL){
145          freeIOType(&cur->defaults);
146          free(cur->defaults);
147          cur->defaults=NULL;
148        }
149        if(cur->supported!=NULL){
150          freeIOType(&cur->supported);
151          free(cur->supported);
152          cur->supported=NULL;
153        }
154        freeElements(&tmp[2]);
155        free(tmp[2]);
156        freeElements(&tmp[1]);
157        free(tmp[1]);
158        freeElements(&tmp[0]);
159        free(tmp[0]);
160        //addToMap(cur->content,"internal","true");
161      }
162    }
163    cur=cur->next;
164  }
165  //dumpElements((*s)->outputs);
166}
167
168/**
169 * Acquire a read lock on every files used as input for executing a service.
170 * @param conf the main configuration file map
171 * @return 0 if every file can be locked, -1 if one lock has failed.
172 */
173int addReadLocks(maps** conf){
174  map* queueLengthMap=getMapFromMaps(*conf,"uploadQueue","length");
175  maps* queueMaps=getMaps(*conf,"uploadQueue");
176  if(queueLengthMap!=NULL){
177    int cnt=atoi(queueLengthMap->value);
178    int i=0;
179    for(i=0;i<cnt;i++){
180      map* argv[2]={
181        getMapArray(queueMaps->content,"input",i),
182        getMapArray(queueMaps->content,"localPath",i)
183      };
184      zooLock* lck;
185      if((lck=lockFile(*conf,argv[1]->value,'r'))==NULL){
186        char* templateStr=_("Unable to lock the file for %s in read mode.");
187        char *tmpMessage=(char*)malloc((strlen(templateStr)+strlen(argv[0]->value)+1)*sizeof(char));
188        sprintf(tmpMessage,templateStr,argv[0]->value);
189        setMapInMaps(*conf,"lenv","message",tmpMessage);
190        free(tmpMessage);
191        return -1;
192      }else{
193        if(zoo_file_locks_cnt==0){
194          zoo_file_locks=(zooLock**)malloc(sizeof(zooLock*));
195        }
196        else{
197          zoo_file_locks=(zooLock**)realloc(zoo_file_locks,(zoo_file_locks_cnt+1)*sizeof(zooLock*));
198        }
199        zoo_file_locks[zoo_file_locks_cnt]=lck;
200        zoo_file_locks_cnt++;
201      }
202    }
203  }
204  return 0;
205}
206
207/**
208 * Remove all read locks set for files used as input for executing the service.
209 * @param conf the main configuration maps pointer
210 * @return 0 in case of success, -1 if any error occured. In case of error, one
211 * can refer to the message map array from the lenv section.
212 */
213int removeReadLocks(maps** conf){
214  int res=0;
215  int nberr=0;
216  map* queueLengthMap=getMapFromMaps(*conf,"uploadQueue","length");
217  maps* queueMaps=getMaps(*conf,"uploadQueue");
218  if(queueLengthMap!=NULL){
219    int cnt=atoi(queueLengthMap->value);
220    int i=0;
221    for(i=0;i<cnt;i++){
222      if(unlockFile(*conf,zoo_file_locks[i])<1){
223        map* argv=getMapArray(queueMaps->content,"input",i);
224        char* templateStr=_("Unable to unlock the file for %s after execution.");
225        char *tmpMessage=(char*)malloc((strlen(templateStr)+strlen(argv->value)+1)*sizeof(char));
226        sprintf(tmpMessage,templateStr,argv->value);
227        maps* lenv=getMaps(*conf,"lenv");
228        setMapArray(lenv->content,"message",nberr,tmpMessage);
229        free(tmpMessage);
230        res=-1;
231        nberr++;
232      }
233    }
234  }
235  free(zoo_file_locks);
236  return res;
237}
238
239/**
240 * Get the section name depending on number of features and/or pixels of each
241 * inputs and the threshold defined in a section.
242 * It supposes that your inputs has been published using MapServer support,
243 * implying that the number of features (nb_features), respectively pixels
244 * (nb_pixels), are defined. The section, identified by confId, should contain
245 * preview_max_features and preview_max_pixels defining the threshold values.
246 * @param conf the main configuration file maps pointer
247 * @param inputs the inputs maps pointer
248 * @param confId the section identifier
249 * @return "preview_conf" in case the numbers are lower than the threshold,
250 * "fullres_conf" in other cases.
251 */
252char* getConfiguration(maps** conf,maps** inputs,const char* confId){
253  maps* input=*inputs;
254  map* max_pixels=getMapFromMaps(*conf,confId,"preview_max_pixels");
255  map* max_features=getMapFromMaps(*conf,confId,"preview_max_features");
256  int i_max_pixels=atoi(max_pixels->value);
257  int i_max_features=atoi(max_features->value);
258  while(input!=NULL && input->content!=NULL){
259    map* tmpMap=getMap(input->content,"geodatatype");
260    if(tmpMap!=NULL){
261      map* currentNb;
262      if(strcasecmp(tmpMap->value,"raster")==0 ){
263        currentNb=getMap(input->content,"nb_pixels");
264        if(atoi(currentNb->value)>i_max_pixels)
265          return "fullres_conf";
266      }else{
267        if(strcasecmp(tmpMap->value,"vector")==0 ){
268          currentNb=getMap(input->content,"nb_features");
269          if(atoi(currentNb->value)>i_max_features)
270            return "fullres_conf";
271        }
272      }
273    }
274    input=input->next;
275  }
276  return "preview_conf";
277}
278
279/**
280 * Load and run a HPC Application corresponding to the service.
281 *
282 * @param main_conf the conf maps containing the main.cfg settings
283 * @param request the map containing the HTTP request
284 * @param s the service structure
285 * @param real_inputs the maps containing the inputs
286 * @param real_outputs the maps containing the outputs
287 * @return SERVICE_SUCCEEDED in case of success, -1 or SERVICE_FAILED when failing.
288 */
289int zoo_hpc_support(maps** main_conf,map* request,service* s,maps **real_inputs,maps **real_outputs){
290  maps* m=*main_conf;
291  maps* inputs=*real_inputs;
292  maps* outputs=*real_outputs;
293  map* tmp0=getMapFromMaps(*main_conf,"lenv","cwd");
294  char *ntmp=tmp0->value;
295  map* tmp=NULL;
296  int res=-1;
297  // Get the configuration id depending on service type and defined thresholds
298  // then, set the configId key in the lenv section
299  char *serviceType;
300  map* mServiceType=getMap(s->content,"serviceType");
301  if(mServiceType!=NULL)
302    serviceType=mServiceType->value;
303  else
304    serviceType="HPC";
305  map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");
306  map* uuid=getMapFromMaps(*main_conf,"lenv","usid");
307  map* confMap=getMapFromMaps(*main_conf,serviceType,getConfiguration(main_conf,real_inputs,serviceType));
308  char * configurationId=confMap->value;
309  setMapInMaps(*main_conf,"lenv","configId",configurationId);
310  // Dump lenv maps again after having set the configId ...
311  char *flenv =
312    (char *)
313    malloc ((strlen (tmpPath->value) + 
314             strlen (uuid->value) + 12) * sizeof (char));
315  sprintf (flenv, "%s/%s_lenv.cfg", tmpPath->value, uuid->value);
316  maps* lenvMaps=getMaps(m,"lenv");
317  dumpMapsToFile(lenvMaps,flenv,0);
318  free(flenv);
319
320  map* targetPathMap=getMapFromMaps(*main_conf,configurationId,"remote_data_path");
321 
322  pthread_t threads_pool[50];
323  // Force the HPC services to be called asynchronously
324  map* isAsync=getMapFromMaps(*main_conf,"lenv","async");
325  if(isAsync==NULL){
326    errorException(*main_conf,_("The synchronous mode is not supported by this type of service"),"NoSuchMode",s->name);
327    return -1;
328  }
329
330  maps* input=*real_inputs;
331  char **parameters=NULL;
332  int parameters_cnt=0;
333  while(input!=NULL && input->content!=NULL){
334    if(getMaps(*real_outputs,input->name)==NULL){
335      parameters_cnt+=1;
336      if(parameters_cnt==1)
337        parameters=(char**)malloc(parameters_cnt*sizeof(char*));
338      else
339        parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
340      if(getMap(input->content,"mimeType")!=NULL){
341        // Input is ComplexData
342        if(getMap(input->content,"cache_file")==NULL){
343          // Input data has been passed by value
344          // TODO: publish input through MapServer / use output publication
345          dumpMapsValuesToFiles(main_conf,&input);
346          addToMap(input->content,"toPublish","true");
347          addToMap(input->content,"useMapserver","true");
348        }
349        if(getMap(input->content,"cache_file")!=NULL){
350          map* length=getMap(input->content,"length");
351          if(length==NULL){
352            addToMap(input->content,"length","1");
353            length=getMap(input->content,"length");
354          }
355          int len=atoi(length->value);
356          int i=0;
357          for(i=0;i<len;i++){
358            map* tmp=getMapArray(input->content,"cache_file",i);
359            char* targetName=strrchr(tmp->value,'/');
360            char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
361            sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
362            setMapArray(input->content,"targetPath",i,targetPath);
363            setMapArray(input->content,"localPath",i,tmp->value);
364            addToUploadQueue(main_conf,input);
365            if(i==0){
366              parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
367              sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
368            }else{
369              char *tmpStr=zStrdup(parameters[parameters_cnt-1]);
370              parameters[parameters_cnt-1]=(char*)realloc(parameters[parameters_cnt-1],(strlen(tmpStr)+strlen(targetPath)+2)*sizeof(char));
371              sprintf(parameters[parameters_cnt-1],"%s %s",tmpStr,targetPath);
372              free(tmpStr);
373            }
374            free(targetPath);
375          }
376        }else{
377          // ???
378          fprintf(stderr,"%s %d\n",__FILE__,__LINE__);
379          fflush(stderr);
380        }
381      }else{
382        // LitteralData and BboxData
383        if(getMap(input->content,"dataType")!=NULL){
384          // For LitteralData, simply pass the value
385          map* val=getMap(input->content,"value");
386          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(val->value)+3)*sizeof(char));
387          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,val->value);
388        }
389      }
390    }
391    input=input->next;
392  }
393
394#ifdef HPC_DEBUG
395  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
396#endif
397  invokeCallback(m,inputs,NULL,1,1);
398  if(getMapFromMaps(m,"lenv","mapError")!=NULL){
399    invokeCallback(m,inputs,NULL,7,0);
400    return -1;
401  }
402  invokeCallback(m,inputs,NULL,2,0);
403#ifdef HPC_DEBUG
404  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
405  dumpMaps(inputs);
406#endif
407
408  // Upload data on HPC
409  if(runUpload(main_conf)==false){
410    errorException (m, _("Unable to lock the file for upload!"),
411                    "InternalError", NULL);
412#ifdef HPC_DEBUG
413    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
414#endif
415    invokeCallback(m,inputs,NULL,7,0);
416#ifdef HPC_DEBUG
417    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
418#endif
419    return -1;
420  }
421#ifdef HPC_DEBUG
422  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
423#endif
424  invokeCallback(m,inputs,NULL,2,1);
425#ifdef HPC_DEBUG
426  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
427#endif
428
429  // Add the filename to generate for every output to parameters
430  input=*real_outputs;
431  // TODO: fix appendOutputParameters
432  //appendOutputParameters(input,parameters,&parameters_cnt,s,uuid,targetPathMap);
433#ifdef HPC_DEBUG
434  dumpMaps(input);
435#endif
436  while(input!=NULL){
437    // TODO: parse all outputs including inner outputs if required.
438    if(input->child==NULL){
439      // Name every files that should be produced by the service execution
440      map* mime=getMap(input->content,"mimeType");
441      char* targetName;
442      if(mime!=NULL){
443        bool hasExt=false;
444        map* fileExt=getFileExtensionMap(mime->value,&hasExt);
445        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char));
446        sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value);
447        freeMap(&fileExt);
448        free(fileExt);
449      }else{
450        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char));
451        sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value);
452      }
453      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
454      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
455      free(targetName);
456      setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
457      if(strcasecmp(input->name,"wms_link")!=0&&
458         strcasecmp(input->name,"wcs_link")!=0 &&
459         strcasecmp(input->name,"wfs_link")!=0){
460        parameters_cnt+=1;
461        if(parameters_cnt==1)
462          parameters=(char**)malloc(parameters_cnt*sizeof(char*));
463        else
464          parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
465        // We should verify if any optional tag for output is required
466        // (i.e. -out output.tiff *int8*), meaning that we should search
467        // for a corresponding inputs name.
468        map* inValue=getMapFromMaps(*real_inputs,input->name,"value");
469        if(inValue!=NULL){
470          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char));
471          sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value);
472        }else{
473          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
474          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
475        }
476      }
477      free(targetPath);
478    }// In other case it means we need to return the cache_file as generated_file
479    else{
480      // Name every files that should be produced by the service execution
481      map* mime=getMap(input->child->content,"mimeType");
482      char* targetName;
483      if(mime!=NULL){
484        bool hasExt=false;
485        map* fileExt=getFileExtensionMap(mime->value,&hasExt);
486        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+strlen(fileExt->value)+11)*sizeof(char));
487        sprintf(targetName,"output_%s_%s_%s.%s",s->name,input->name,uuid->value,fileExt->value);
488        freeMap(&fileExt);
489        free(fileExt);
490      }else{
491        targetName=(char*)malloc((strlen(s->name)+strlen(input->name)+strlen(uuid->value)+14)*sizeof(char));
492        sprintf(targetName,"output_%s_%s_%s.tif",s->name,input->name,uuid->value);
493      }
494      char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
495      sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
496      free(targetName);
497      addToMap(input->content,"generated_file",targetPath);
498      addToMap(input->content,"storage",targetPath);
499      if(strcasecmp(input->name,"wms_link")!=0&&
500         strcasecmp(input->name,"wcs_link")!=0 &&
501         strcasecmp(input->name,"wfs_link")!=0){
502        parameters_cnt+=1;
503        if(parameters_cnt==1)
504          parameters=(char**)malloc(parameters_cnt*sizeof(char*));
505        else
506          parameters=(char**)realloc(parameters,parameters_cnt*sizeof(char*));
507        // We should verify if any optional tag for output is required
508        // (i.e. -out output.tiff *int8*), meaning that we should search
509        // for a corresponding inputs name.
510        map* inValue=getMapFromMaps(*real_inputs,input->name,"value");
511        if(inValue!=NULL){
512          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+strlen(inValue->value)+4)*sizeof(char));
513          sprintf(parameters[parameters_cnt-1],"-%s %s %s",input->name,targetPath,inValue->value);
514        }else{
515          parameters[parameters_cnt-1]=(char*)malloc((strlen(input->name)+strlen(targetPath)+3)*sizeof(char));
516          sprintf(parameters[parameters_cnt-1],"-%s %s",input->name,targetPath);
517        }
518      }
519      free(targetPath);
520    }
521    input=input->next;
522  }
523  // Produce the SBATCH File locally
524  char *scriptPath=(char*)malloc((strlen(s->name)+strlen(tmpPath->value)+strlen(uuid->value)+10)*sizeof(char));
525  sprintf(scriptPath,"%s/zoo_%s_%s.sh",tmpPath->value,s->name,uuid->value);
526  setMapInMaps(*main_conf,"lenv","local_script",scriptPath);
527#ifdef HPC_DEBUG
528  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
529  fflush(stderr);
530#endif
531  invokeCallback(m,inputs,NULL,3,0);
532#ifdef HPC_DEBUG
533  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
534  fflush(stderr);
535#endif
536  FILE* scriptFile=fopen(scriptPath,"w+");
537  map* headerMap=getMapFromMaps(*main_conf,configurationId,"jobscript_header");
538  if(headerMap!=NULL){
539    // Use the header file if defined in the HPC section of the main.cfg file
540    struct stat f_status;
541    int s=stat(headerMap->value, &f_status);
542    if(s==0){
543      char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
544      FILE* f=fopen(headerMap->value,"rb");
545      fread(fcontent,f_status.st_size,1,f);
546      int fsize=f_status.st_size;
547      fcontent[fsize]=0;
548      fclose(f);
549      fprintf(scriptFile,"%s\n### --- ZOO-Service HEADER end --- ###\n\n",fcontent);
550      free(fcontent);
551    }else
552      fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER (no header found) *** ###\n\n");
553  }else
554    fprintf(scriptFile,"#!/bin/bash\n\n### *** Default ZOO-Service HEADER *** ###\n\n");
555  maps* hpc_opts=getMaps(*main_conf,configurationId);
556  if(hpc_opts!=NULL){
557    map* hpc_opts_content=hpc_opts->content;
558    while(hpc_opts_content!=NULL){
559      if(strncasecmp(hpc_opts_content->name,"sbatch_options_",15)==0)
560        fprintf(scriptFile,"#SBATCH --%s=%s\n",strstr(hpc_opts_content->name,"sbatch_options_")+15,hpc_opts_content->value);
561      hpc_opts_content=hpc_opts_content->next;
562    }
563  }
564  fprintf(scriptFile,"#SBATCH --job-name=ZOO-Project_%s_%s\n\n",uuid->value,s->name);
565  map* mods=getMap(s->content,"hpcModules");
566  if(mods!=NULL)
567    fprintf(scriptFile,"#SBATCH --export=MODULES=%s\n",mods->value);
568
569  map* bodyMap=getMapFromMaps(*main_conf,configurationId,"jobscript_body");
570  if(bodyMap!=NULL){
571    // Use the header file if defined in the HPC section of the main.cfg file
572    struct stat f_status;
573    int s=stat(bodyMap->value, &f_status);
574    if(s==0){
575      char* fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
576      FILE* f=fopen(bodyMap->value,"rb");
577      fread(fcontent,f_status.st_size,1,f);
578      int fsize=f_status.st_size;
579      fcontent[fsize]=0;
580      fclose(f);
581      fprintf(scriptFile,"%s\n### --- ZOO-Service BODY end --- ###\n\n",fcontent);
582      free(fcontent);
583    }else
584      fprintf(scriptFile,"\n### *** Default ZOO-Service BODY (no body found) *** ###\n\n");
585  }else
586    fprintf(scriptFile,"\n### *** Default ZOO-Service BODY *** ###\n\n");
587
588  map* sp=getMap(s->content,"serviceProvider");
589 
590  // Require to produce the command line to be executed
591  fprintf(scriptFile,"\n\necho \"Job started at: $(date)\"\n");
592  fprintf(scriptFile,"echo \"Running service: [%s]\"\n",sp->value);
593  fprintf(scriptFile,"%s ",sp->value);
594  for(int i=0;i<parameters_cnt;i++){
595    fprintf(scriptFile," %s",parameters[i]);
596  }
597  for(int i=parameters_cnt-1;i>=0;i--){
598    free(parameters[i]);
599  }
600  free(parameters);
601  fprintf(scriptFile,"\n");
602  fprintf(scriptFile,"echo \"Job finished at: $(date)\"\n");
603  fflush(scriptFile);
604  fclose(scriptFile);
605#ifdef HPC_DEBUG
606  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
607#endif
608  invokeCallback(m,inputs,NULL,3,1);
609#ifdef HPC_DEBUG
610  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
611#endif
612
613  // Upload the SBATCH File to the remote host
614#ifdef HPC_DEBUG
615  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
616#endif
617  invokeCallback(m,inputs,NULL,4,0);
618#ifdef HPC_DEBUG
619  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
620#endif
621  targetPathMap=getMapFromMaps(*main_conf,configurationId,"remote_work_path");
622  if(targetPathMap==NULL){
623    setMapInMaps(*main_conf,"lenv","message",_("There is no remote_work_path defined in your section!"));
624    setMapInMaps(*main_conf,"lenv","status","failed");
625    errorException (m, _("There is no remote_work_path defined in your section!"),
626                    "InternalError", NULL);
627#ifdef HPC_DEBUG
628    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
629    fflush(stderr);
630#endif
631    invokeCallback(m,NULL,NULL,7,0);
632#ifdef HPC_DEBUG
633    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
634    fflush(stderr);
635#endif
636    return SERVICE_FAILED;
637  }
638  char* targetName=strrchr(scriptPath,'/');
639  char *targetPath=(char*)malloc((strlen(targetPathMap->value)+strlen(targetName)+2)*sizeof(char));
640  sprintf(targetPath,"%s/%s",targetPathMap->value,targetName);
641  setMapInMaps(*main_conf,"lenv","remote_script",targetPath);
642  SSHCON *test=ssh_connect(*main_conf);
643  int copy0=ssh_copy(*main_conf,scriptPath,targetPath,ssh_get_cnt(*main_conf));
644  unlink(scriptPath);
645  free(scriptPath);
646  if(copy0!=true){
647    setMapInMaps(m,"lenv","message",_("Unable to upload the script"));
648    invokeCallback(m,NULL,NULL,7,0);
649    errorException(m,_("Unable to upload the script"),"NoApplicableCode",NULL);
650    return -1;
651  }
652  // Execute the SBATCH script remotely
653  addReadLocks(main_conf);
654  map* subStr=getMapFromMaps(*main_conf,configurationId,"sbatch_substr");
655  char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+137)*sizeof(char));
656  sprintf(command,"sbatch %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value);
657  if(ssh_exec(*main_conf,command,ssh_get_cnt(m))<=0){
658    // The sbatch command has failed!
659    // Download the error log file from the HPC server
660    char tmpS[1024];
661    free(command);
662    command=(char*)malloc((strlen(targetPathMap->value)+strlen(uuid->value)+11)*sizeof(char));
663    sprintf(command,"%s/error_%s.log",targetPathMap->value,uuid->value);
664    targetName=strrchr(command,'/');
665    free(targetPath);
666    targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(targetName)+2)*sizeof(char));
667    sprintf(targetPath,"%s/%s",tmpPath->value,targetName);
668    if(ssh_fetch(*main_conf,targetPath,command,ssh_get_cnt(m))==0){
669      struct stat f_status;
670      int ts=stat(targetPath, &f_status);
671      if(ts==0) {
672        char* fcontent = NULL;
673        fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
674        FILE* f=fopen(targetPath,"rb");
675        fread(fcontent,f_status.st_size,1,f);
676        int fsize=f_status.st_size;
677        fcontent[fsize]=0;
678        fclose(f);
679        setMapInMaps(*main_conf,"lenv","message",fcontent);
680        free(fcontent);
681      }else
682        setMapInMaps(*main_conf,"lenv","message",_("No message provided"));
683    }else
684      setMapInMaps(*main_conf,"lenv","message",_("Unable to fetch the remote error log file"));
685    tmpPath=getMapFromMaps(m,"lenv","message");
686#ifdef HPC_DEBUG
687    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
688    fflush(stderr);
689#endif
690    invokeCallback(m,NULL,NULL,7,0);
691#ifdef HPC_DEBUG
692    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
693    fflush(stderr);
694#endif
695    sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s using %s: %s", s->name, configurationId, tmpPath->value);
696    errorException(m,tmpS,"NoApplicableCode",NULL);
697    free(command);
698    free(targetPath);
699    ssh_close(*main_conf);
700    removeReadLocks(main_conf);
701    sleep(1);
702    return -1;
703  }
704  free(targetPath);
705#ifdef HPC_DEBUG
706  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
707  fflush(stderr);
708#endif
709  invokeCallback(m,NULL,NULL,4,1);
710#ifdef HPC_DEBUG
711  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
712  fflush(stderr);
713#endif
714  free(command);
715#ifdef HPC_DEBUG
716  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
717  fflush(stderr);
718#endif
719
720  struct sockaddr_un addr;
721  memset(&addr, 0, sizeof(addr));
722  addr.sun_family = AF_UNIX;
723  int rc, cl, fd = socket(AF_UNIX, SOCK_STREAM, 0);
724  char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+20));
725  sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,uuid->value);
726  strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1);
727 
728  if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
729    perror("bind error");
730    setMapInMaps(m,"lenv","message",_("Unable to bind socket!"));
731    errorException (m, _("Unable to bind socket!"),
732                    "InternalError", NULL);
733#ifdef HPC_DEBUG
734    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
735    fflush(stderr);
736#endif
737    invokeCallback(m,NULL,NULL,7,0);
738    removeReadLocks(main_conf);
739#ifdef HPC_DEBUG
740    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
741    fflush(stderr);
742#endif
743    return -1;
744  }
745#ifdef HPC_DEBUG
746  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
747  fflush(stderr);
748#endif
749  if (listen(fd, 5) == -1) {
750    setMapInMaps(*main_conf,"lenv","message",_("Listen error"));
751    errorException (m, _("Listen error"),
752                    "InternalError", NULL);
753#ifdef HPC_DEBUG
754    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
755    fflush(stderr);
756#endif
757    invokeCallback(m,NULL,NULL,7,0);
758    removeReadLocks(main_conf);
759#ifdef HPC_DEBUG
760    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
761    fflush(stderr);
762#endif
763    return -1;
764  }
765  if ( (cl = accept(fd, NULL, NULL)) == -1) {
766    setMapInMaps(*main_conf,"lenv","message",_("Accept error"));
767    errorException (m, _("Accept error"),
768                    "InternalError", NULL);
769#ifdef HPC_DEBUG
770    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
771    fflush(stderr);
772#endif
773    invokeCallback(m,NULL,NULL,7,0);
774    removeReadLocks(main_conf);
775#ifdef HPC_DEBUG
776    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
777    fflush(stderr);
778#endif
779    return -1;
780  }else{
781#ifdef HPC_DEBUG
782    fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
783    fflush(stderr);
784#endif
785    int hasPassed=-1;
786    char buf[11];
787    memset(&buf,0,11);
788    while ( (rc=read(cl,buf,10)) ) {     
789      if(rc==0){
790        sleep(1);
791        setMapInMaps(*main_conf,"lenv","message",_("Read closed"));
792        errorException (m, _("Read closed"),
793                        "InternalError", NULL);
794#ifdef HPC_DEBUG
795        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
796        fflush(stderr);
797#endif
798        invokeCallback(m,NULL,NULL,7,0);
799        removeReadLocks(main_conf);
800#ifdef HPC_DEBUG
801        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
802        fflush(stderr);
803#endif
804        return -1;
805      }else{
806        if(rc<0){
807          setMapInMaps(*main_conf,"lenv","message",_("Read error"));
808          errorException (m, _("Read error"),
809                          "InternalError", NULL);
810#ifdef HPC_DEBUG
811          fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
812          fflush(stderr);
813#endif
814          invokeCallback(m,NULL,NULL,7,0);
815          removeReadLocks(main_conf);
816#ifdef HPC_DEBUG
817          fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
818          fflush(stderr);
819#endif
820          return -1;
821        }
822      }
823      hasPassed=1;
824      res=atoi(buf);
825      unlink(sname);
826      free(sname);
827      removeReadLocks(main_conf);
828 
829      if(res==3){
830#ifdef HPC_DEBUG
831        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
832        fflush(stderr);
833#endif
834        invokeCallback(m,NULL,outputs,5,0);
835#ifdef HPC_DEBUG
836        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
837        fflush(stderr);
838#endif
839
840        // Read informations provided by FinalizeHPC as a configuration file
841        // then, remove the file.
842        map* jobid=getMapFromMaps(*main_conf,"lenv","usid");
843        map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");
844        char *filePath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+15)*sizeof(char));
845        sprintf(filePath,"%s/exec_status_%s",tmpPath->value,jobid->value);
846        maps* m = (maps *) malloc (MAPS_SIZE);
847        m->child=NULL;
848        m->next=NULL;
849        int saved_stdout = dup (fileno (stdout));
850        dup2 (fileno (stderr), fileno (stdout));
851        conf_read(filePath,m);
852        fflush(stdout);
853        dup2 (saved_stdout, fileno (stdout));
854        close(saved_stdout);
855        unlink(filePath);
856        free(filePath);
857        addMapsToMaps(main_conf,m);
858        freeMaps(&m);
859        free(m);
860
861        input=*real_outputs;
862        while(input!=NULL){
863          if(input->child==NULL){
864            map* generatedFile=getMap(input->content,"generated_file");
865            if(generatedFile!=NULL){
866              char* filename=strrchr(generatedFile->value,'/');
867              char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
868              sprintf(targetPath,"%s/%s",tmpPath->value,filename);
869              test=ssh_connect(*main_conf);
870              if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){
871                setMapInMaps(*real_outputs,input->name,"generated_file",targetPath);
872                free(targetPath);
873              }else{
874                char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
875                sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
876                setMapInMaps(m,"lenv","message",tmpStr);
877                free(tmpStr);
878                invokeCallback(m,NULL,NULL,7,0);
879                return SERVICE_FAILED;
880              }
881            }       
882          }else{
883            map* generatedFile=getMap(input->content,"generated_file");
884            if(generatedFile!=NULL){
885              char* filename=strrchr(generatedFile->value,'/');
886              char* targetPath=(char*)malloc((strlen(tmpPath->value)+strlen(filename)+2)*sizeof(char));
887              sprintf(targetPath,"%s/%s",tmpPath->value,filename);
888              test=ssh_connect(*main_conf);
889              if(ssh_fetch(*main_conf,targetPath,generatedFile->value,ssh_get_cnt(m))==0){
890                maps* tmp=getMaps(*real_outputs,input->name);
891                char serviceName[9];
892                freeMap(&tmp->content);
893                free(tmp->content);
894                tmp->content=NULL;
895                maps* output=getMaps(*real_outputs,input->name);
896                setMapInMaps(output->child,"download_link","generated_file",targetPath);
897                setMapInMaps(output->child,"download_link","storage",targetPath);
898                setMapInMaps(output->child,"download_link","useMapserver","false");
899                setMapInMaps(output->child,"download_link","replicateStorageNext","true");
900                setMapInMaps(output->child,"download_link","asReference","true");
901                setMapInMaps(output->child,"download_link","inRequest","true");
902                setMapInMaps(output->child,"wms_link","generated_file",targetPath);
903                setMapInMaps(output->child,"wms_link","storage",targetPath);
904                setMapInMaps(output->child,"wms_link","useMapserver","true");
905                setMapInMaps(output->child,"wms_link","msOgc","WMS");
906                setMapInMaps(output->child,"wms_link","requestedMimeType","image/png");
907                setMapInMaps(output->child,"wms_link","asReference","true");
908                if(getMaps(output->child,"wcs_link")!=NULL){
909                  sprintf(serviceName,"wcs_link");
910                  setMapInMaps(output->child,serviceName,"msOgc","WCS");
911                  setMapInMaps(output->child,serviceName,"requestedMimeType","image/tiff");
912                }
913                else{
914                  sprintf(serviceName,"wfs_link");
915                  setMapInMaps(output->child,serviceName,"msOgc","WFS");
916                  setMapInMaps(output->child,serviceName,"requestedMimeType","text/xml");
917                }
918                setMapInMaps(output->child,serviceName,"storage",targetPath);
919                setMapInMaps(output->child,serviceName,"generated_file",targetPath);
920                setMapInMaps(output->child,serviceName,"useMapserver","true");
921                setMapInMaps(output->child,serviceName,"asReference","true");
922              }else{
923                map* hpcStdErr=getMapFromMaps(*main_conf,"henv","StdErr");
924                if(hpcStdErr!=NULL && ssh_fetch(*main_conf,targetPath,hpcStdErr->value,ssh_get_cnt(m))==0){
925                  struct stat f_status;
926                  int ts=stat(targetPath, &f_status);
927                  if(ts==0) {
928                    char* fcontent = NULL;
929                    fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1));
930                    FILE* f=fopen(targetPath,"rb");
931                    fread(fcontent,f_status.st_size,1,f);
932                    int fsize=f_status.st_size;
933                    fcontent[fsize]=0;
934                    fclose(f);
935                    setMapInMaps(*main_conf,"lenv","message",fcontent);
936                    free(fcontent);
937                  }else{
938                    char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
939                    sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
940                    setMapInMaps(*main_conf,"lenv","message",tmpStr);
941                    free(tmpStr);
942                  }
943                }else{
944                  char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char));
945                  sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename);
946                  setMapInMaps(*main_conf,"lenv","message",tmpStr);
947                  free(tmpStr);
948                }
949                invokeCallback(*main_conf,NULL,NULL,7,0);
950                return SERVICE_FAILED;
951              }
952              free(targetPath);
953            }
954          }
955          input=input->next;
956        }
957
958      }else{
959        // Try to access remotely to the log file and return a more relevant error message
960        setMapInMaps(m,"lenv","message",_("HPC Execution failed!"));
961        errorException (m, _("HPC Execution failed!"),
962                        "InternalError", NULL);
963#ifdef HPC_DEBUG
964        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
965        fflush(stderr);
966#endif
967        invokeCallback(m,NULL,NULL,7,0);
968#ifdef HPC_DEBUG
969        fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
970        fflush(stderr);
971#endif
972      }
973      //free(buf);
974    }
975    if(hasPassed<0){
976      perror("Failed to read");
977      setMapInMaps(*main_conf,"lenv","message",_("Unable to parse the value returned by remote execution"));
978      errorException (m, _("Unable to parse the value returned by remote execution"),
979                      "InternalError", NULL);
980#ifdef HPC_DEBUG
981      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
982      fflush(stderr);
983#endif
984      invokeCallback(m,NULL,NULL,7,0);
985#ifdef HPC_DEBUG
986      fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
987      fflush(stderr);
988#endif
989      return SERVICE_FAILED;
990    }
991  }
992#ifdef HPC_DEBUG
993  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
994  fflush(stderr);
995#endif
996  ssh_close(*main_conf);
997#ifdef HPC_DEBUG
998  fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);
999  fflush(stderr);
1000#endif
1001  return res;
1002}
Note: See TracBrowser for help on using the repository browser.

Search

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png