- Timestamp:
- Aug 31, 2017, 4:14:46 PM (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c
r850 r851 30 30 #include "service_json.h" 31 31 #include "service_internal_ms.h" 32 #include <pthread.h> 32 33 33 34 #ifdef __cplusplus … … 35 36 #endif 36 37 38 int nbThreads=0; 39 int cStep=0; 40 pthread_t* myThreads=NULL; 41 bool steps[7][2]={ 42 {false,false}, 43 {false,false}, 44 {false,false}, 45 {false,false}, 46 {false,false}, 47 {false,false}, 48 {false,false} 49 }; 50 37 51 /** 38 * Check if a service name is prohibited, meaning that we don't have to invoke39 * t he callback for this specific service.52 * Check if a service name is prohibited, meaning that the Kernel doesn't have 53 * to invoke the callback for this specific service. 40 54 * 55 * @param conf the main configuration file maps 56 * @param serviceName the serviceName 57 * @return a bool true if the service is prohibited, false in other case 41 58 */ 42 59 bool isProhibited(maps* conf,const char* serviceName){ … … 52 69 } 53 70 return false; 71 } 72 73 /** 74 * Parameter definition to be used for sending parameters to a thread. 75 */ 76 typedef struct { 77 maps *conf; //!< the main configuration file 78 map *url; //!< the callback url maps 79 json_object *res;//!< the JSON object to post 80 int step; //!< the current step [0,6] 81 int state; //!< the current state [0,1] 82 } local_params; 83 84 /** 85 * Verify if the URL should use a shared cache or not. 86 * 87 * In case the security section contains a key named "shared", then if the 88 * domain listed in the shared key are contained in the url given as parameter 89 * then it return "SHARED" in other cases, it returns "OTHER". 90 * 91 * @param conf the main configuration file maps 92 * @param url the URL to evaluate 93 * @return a string "SHARED" in case the host is in a domain listed in the 94 * shared key, "OTHER" in other cases. 95 */ 96 char* getProvenance(maps* conf,const char* url){ 97 map* sharedCache=getMapFromMaps(conf,"security","shared"); 98 if(sharedCache!=NULL){ 99 char *res=NULL; 100 char *hosts=sharedCache->value; 101 char *curs=strtok(hosts,","); 102 while(curs!=NULL){ 103 if(strstr(url,curs)==NULL) 104 res="OTHER"; 105 else{ 106 res="SHARED"; 107 return res; 108 } 109 } 110 return res; 111 } 112 return "OTHER"; 113 } 114 115 /** 116 * Practically invoke the callback, meaning sending the HTTP POST request. 117 * 118 * @param args local_params containing all the variables required 119 */ 120 void* _invokeCallback(void* args){ 121 local_params* arg=(local_params*)args; 122 HINTERNET hInternet,res1; 123 hInternet=InternetOpen("ZooWPSClient\0", 124 INTERNET_OPEN_TYPE_PRECONFIG, 125 NULL,NULL, 0); 126 if(!CHECK_INET_HANDLE(hInternet)){ 127 InternetCloseHandle (&hInternet); 128 return false; 129 } 130 char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char)); 131 sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state); 132 const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN); 133 hInternet.waitingRequests[0] = zStrdup(URL); 134 free(URL); 135 fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__); 136 fprintf(stderr," * JSON: [%s] \n",jsonStr); 137 fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]); 138 fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__); 139 while( cStep!=7 && 140 ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) ) 141 ){ 142 sleep(1); 143 } 144 fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__); 145 int i=0; 146 for(i=0;i<7;i++){ 147 fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]); 148 fflush(stderr); 149 } 150 fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n",pthread_self(),__FILE__,__LINE__); 151 fflush(stderr); 152 res1 = InternetOpenUrl (&hInternet, 153 hInternet.waitingRequests[0], 154 (char*)jsonStr, strlen(jsonStr), 155 INTERNET_FLAG_NO_CACHE_WRITE, 156 0); 157 AddHeaderEntries(&hInternet,arg->conf); 158 //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x 159 processDownloads(&hInternet); 160 fprintf(stderr,"************************* From thread %d %s %d: REQUEST END\n\n",pthread_self(),__FILE__,__LINE__); 161 fflush(stderr); 162 char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1) 163 * sizeof (char)); 164 if (tmp == NULL) 165 { 166 setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory")); 167 setMapInMaps(arg->conf,"lenv","code","InternalError"); 168 return NULL; 169 } 170 size_t bRead; 171 InternetReadFile (hInternet.ihandle[0], 172 (LPVOID) tmp, 173 hInternet. 174 ihandle[0].nDataLen, 175 &bRead); 176 tmp[hInternet.ihandle[0].nDataLen] = 0; 177 json_object_put(arg->res); 178 InternetCloseHandle(&hInternet); 179 if(cStep==0 || cStep==6 || arg->state==1) 180 cStep=arg->step+1; 181 steps[arg->step][arg->state]=true; 182 fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__); 183 for(i=0;i<7;i++){ 184 fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]); 185 } 186 fprintf(stderr,"Result: \n%s\n\n",tmp); 187 fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__); 188 fflush(stderr); 189 free(tmp); 190 free(args); 191 pthread_exit(NULL); 54 192 } 55 193 … … 72 210 * @return bool true in case of success, false in other cases 73 211 */ 74 bool invokeCallback(maps* m,maps* inputs,maps* outputs,int step,int state){75 map* url=getMapFromMaps( m,"callback","url");212 bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){ 213 map* url=getMapFromMaps(conf,"callback","url"); 76 214 if(url==NULL) 77 215 return false; 78 216 79 maps* lenv=getMaps( m,"lenv");217 maps* lenv=getMaps(conf,"lenv"); 80 218 map* sname=getMap(lenv->content,"identifier"); 81 if(sname!=NULL && isProhibited( m,sname->value))219 if(sname!=NULL && isProhibited(conf,sname->value)) 82 220 return false; 83 221 84 222 json_object *res=json_object_new_object(); 85 223 86 map* sid=getMapFromMaps( m,"lenv","usid");224 map* sid=getMapFromMaps(conf,"lenv","usid"); 87 225 if(sid!=NULL){ 88 226 json_object *jsStr=json_object_new_string(sid->value); 89 227 json_object_object_add(res,"jobid",jsStr); 90 228 } 229 const struct tm *tm; 230 size_t len; 231 time_t now; 232 char *tmp1; 233 map *tmpStatus; 234 235 now = time ( NULL ); 236 tm = localtime ( &now ); 237 238 tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char)); 239 240 len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm ); 241 json_object *jsStr0=json_object_new_string(tmp1); 242 json_object_object_add(res,"datetime",jsStr0); 243 91 244 switch(step){ 92 245 case 0: { 93 246 // Create a new analyze 94 maps* lenv=getMaps( m,"lenv");95 sid=getMapFromMaps( m,"lenv","xrequest");247 maps* lenv=getMaps(conf,"lenv"); 248 sid=getMapFromMaps(conf,"lenv","xrequest"); 96 249 if(sid!=NULL){ 97 250 json_object *jsStr=json_object_new_string(sid->value); 98 251 json_object_object_add(res,"request_execute_content",jsStr); 99 252 } 100 sid=getMapFromMaps( m,"lenv","identifier");253 sid=getMapFromMaps(conf,"lenv","identifier"); 101 254 if(sid!=NULL){ 102 255 json_object *jsStr=json_object_new_string(sid->value); … … 108 261 // Fetching data inputs 109 262 maps* curs=inputs; 110 111 263 char *keys[8][2]={ 112 264 { … … 139 291 }, 140 292 { 141 " datatype",293 "geodatatype", 142 294 "datatype" 143 295 } … … 147 299 map* tmpMap=getMap(curs->content,"cache_file"); 148 300 sid=getMap(curs->content,"ref_wms_link"); 301 json_object *res2=json_object_new_object(); 149 302 if(tmpMap!=NULL && sid==NULL){ 150 303 addToMap(curs->content,"generated_file",tmpMap->value); 304 struct stat buf; 305 char timeStr[ 100 ] = ""; 306 if (stat(tmpMap->value, &buf)==0){ 307 strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime)); 308 json_object *jsStr=json_object_new_string(timeStr); 309 json_object_object_add(res2,"creation_date",jsStr); 310 } 151 311 tmpMap=getMap(curs->content,"fmimeType"); 152 312 if(tmpMap!=NULL){ 153 313 addToMap(curs->content,"mimeType",tmpMap->value); 154 314 } 155 setReferenceUrl( m,curs);156 //outputMapfile( m,curs);315 setReferenceUrl(conf,curs); 316 //outputMapfile(conf,curs); 157 317 dumpMaps(curs); 158 318 } 159 json_object *res2=json_object_new_object();160 319 int i=0; 161 320 int hasRef=-1; … … 165 324 json_object *jsStr=json_object_new_string(sid->value); 166 325 json_object_object_add(res2,keys[i][1],jsStr); 167 if(i==0) 326 if(i==0){ 168 327 hasRef=1; 328 json_object *jsStr1=json_object_new_string(getProvenance(conf,url->value)); 329 json_object_object_add(res2,"dataOrigin",jsStr1); 330 } 169 331 } 170 332 } … … 176 338 } 177 339 json_object_object_add(res,"inputs",res1); 178 json_object* in=mapsToJson(inputs);179 if(in!=NULL){180 //json_object_object_add(res,"inputs",in);181 json_object_put(in);182 }183 340 break; 184 341 } 185 342 case 2: { 186 343 // Uploading data input to cluster 187 maps* in=getMaps( m,"uploadQueue");344 maps* in=getMaps(conf,"uploadQueue"); 188 345 if(in!=NULL){ 189 346 maps* curs=in; … … 216 373 case 3: { 217 374 // Generating job script 218 sid=getMapFromMaps( m,"lenv","local_script");375 sid=getMapFromMaps(conf,"lenv","local_script"); 219 376 if(sid!=NULL){ 220 377 json_object *jsStr=json_object_new_string(sid->value); … … 225 382 case 4: { 226 383 // Submitting job to cluster 227 sid=getMapFromMaps( m,"lenv","remote_script");384 sid=getMapFromMaps(conf,"lenv","remote_script"); 228 385 if(sid!=NULL){ 229 386 json_object *jsStr=json_object_new_string(sid->value); … … 234 391 case 5: { 235 392 // Downloading process outputs from cluster 236 json_object* in=mapsToJson(outputs); 237 if(in!=NULL){ 238 //json_object_object_add(res,"outputs",in); 239 json_object_put(in); 240 } 393 //json_object* in=mapsToJson(outputs); 394 dumpMaps(outputs); 395 maps* curs=outputs; 396 char *keys[8][2]={ 397 { 398 "Reference", 399 "ref" 400 }, 401 { 402 "storage", 403 "cachefile" 404 }, 405 { 406 "fmimeType", 407 "mimetype" 408 }, 409 { 410 "size", 411 "size" 412 }, 413 { 414 "ref_wms_link", 415 "ref_wms_link" 416 }, 417 { 418 "ref_wcs_link", 419 "ref_wcs_link" 420 }, 421 { 422 "ref_wfs_link", 423 "ref_wfs_link" 424 }, 425 { 426 "geodatatype", 427 "datatype" 428 } 429 }; 430 json_object *res1=json_object_new_object(); 431 while(curs!=NULL){ 432 map* tmpMap=getMap(curs->content,"cache_file"); 433 sid=getMap(curs->content,"ref_wms_link"); 434 json_object *res2=json_object_new_object(); 435 int i=0; 436 int hasRef=-1; 437 for(;i<8;i++){ 438 sid=getMap(curs->content,keys[i][0]); 439 if(sid!=NULL){ 440 json_object *jsStr=json_object_new_string(sid->value); 441 json_object_object_add(res2,keys[i][1],jsStr); 442 if(i==0) 443 hasRef=1; 444 } 445 } 446 if(hasRef>0) 447 json_object_object_add(res1,curs->name,res2); 448 else{ 449 maps* curs0=curs->child; 450 while(curs0!=NULL){ 451 json_object *res3=json_object_new_object(); 452 int i0=0; 453 int hasRef0=-1; 454 for(;i0<8;i0++){ 455 sid=getMap(curs0->content,keys[i0][0]); 456 if(sid!=NULL){ 457 json_object *jsStr=json_object_new_string(sid->value); 458 json_object_object_add(res3,keys[i0][1],jsStr); 459 //if(i0==0) 460 hasRef0=1; 461 } 462 } 463 if(hasRef0<0) 464 json_object_put(res3); 465 else 466 json_object_object_add(res2,curs0->name,res3); 467 curs0=curs0->next; 468 } 469 json_object_object_add(res1,curs->name,res2); 470 } 471 curs=curs->next; 472 } 473 json_object_object_add(res,"outputs",res1); 241 474 break; 242 475 } 243 476 case 6: { 244 477 // Finalize HPC 245 sid=getMapFromMaps( m,"lenv","local_script");478 sid=getMapFromMaps(conf,"lenv","local_script"); 246 479 if(sid!=NULL){ 247 480 json_object *jsStr=json_object_new_string(sid->value); … … 252 485 case 7: { 253 486 // Error or Dismiss 254 sid=getMapFromMaps( m,"lenv","message");487 sid=getMapFromMaps(conf,"lenv","message"); 255 488 if(sid!=NULL){ 256 489 json_object *jsStr=json_object_new_string(sid->value); … … 265 498 } 266 499 } 267 fprintf(stderr,"************************* %s %d\n\n",__FILE__,__LINE__); 268 fflush(stderr); 269 HINTERNET hInternet,res1; 270 hInternet=InternetOpen("ZooWPSClient\0", 271 INTERNET_OPEN_TYPE_PRECONFIG, 272 NULL,NULL, 0); 273 if(!CHECK_INET_HANDLE(hInternet)){ 274 InternetCloseHandle (&hInternet); 500 501 local_params* argumentsA=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int))); 502 argumentsA->conf=conf; 503 argumentsA->url=url; 504 argumentsA->res=res; 505 argumentsA->step=step; 506 argumentsA->state=state; 507 //pthread_t p1; 508 if(myThreads==NULL) 509 myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t)); 510 else 511 myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t)); 512 if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)argumentsA)==-1){ 513 setMapInMaps(conf,"lenv","message",_("Unable to create a new thread")); 275 514 return false; 276 515 } 277 fprintf(stderr," * JSON: [%s] \n",json_object_to_json_string_ext(res,JSON_C_TO_STRING_PLAIN)); 278 fprintf(stderr," * URL: %s%d_%d/ \n\n",url->value,step,state); 279 fflush(stderr); 280 char *URL=(char*)malloc((strlen(url->value)+5)*sizeof(char)); 281 sprintf(URL,"%s%d_%d/",url->value,step,state); 282 hInternet.waitingRequests[0] = zStrdup(URL); 283 free(URL); 284 const char* jsonStr=json_object_to_json_string_ext(res,JSON_C_TO_STRING_PLAIN); 285 res1 = InternetOpenUrl (&hInternet, 286 hInternet.waitingRequests[0], 287 (char*)jsonStr, strlen(jsonStr), 288 INTERNET_FLAG_NO_CACHE_WRITE, 289 0); 290 AddHeaderEntries(&hInternet,m); 291 //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1); 292 processDownloads(&hInternet); 293 char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1) 294 * sizeof (char)); 295 if (tmp == NULL) 296 { 297 setMapInMaps(m,"lenv","message",_("Unable to allocate memory")); 298 setMapInMaps(m,"lenv","code","InternalError"); 299 return false; 300 } 301 size_t bRead; 302 InternetReadFile (hInternet.ihandle[0], 303 (LPVOID) tmp, 304 hInternet. 305 ihandle[0].nDataLen, 306 &bRead); 307 tmp[hInternet.ihandle[0].nDataLen] = 0; 308 fprintf(stderr,"Result: \n%s\n\n",tmp); 309 fprintf(stderr,"************************* %s %d\n\n",__FILE__,__LINE__); 310 fflush(stderr); 311 free(tmp); 312 json_object_put(res); 313 InternetCloseHandle(&hInternet); 516 //free(argumentsA); 517 nbThreads++; 314 518 return true; 519 } 520 521 void cleanupCallbackThreads(){ 522 int i=0; 523 for(i=0;i<nbThreads;i++){ 524 pthread_join(myThreads[i],NULL); 525 } 526 free(myThreads); 315 527 } 316 528
Note: See TracChangeset
for help on using the changeset viewer.