Ignore:
Timestamp:
Aug 31, 2017, 4:14:46 PM (7 years ago)
Author:
djay
Message:

Invoke callback asynchronously. Still the ZOO-Kernel has still to wait for every requests to finish before stoping its execution.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/prototype-v0/zoo-project/zoo-kernel/service_callback.c

    r850 r851  
    3030#include "service_json.h"
    3131#include "service_internal_ms.h"
     32#include <pthread.h>
    3233
    3334#ifdef __cplusplus
     
    3536#endif
    3637
     38  int nbThreads=0;
     39  int cStep=0;
     40  pthread_t* myThreads=NULL;
     41  bool steps[7][2]={
     42    {false,false},
     43    {false,false},
     44    {false,false},
     45    {false,false},
     46    {false,false},
     47    {false,false},
     48    {false,false}
     49  };
     50 
    3751  /**
    38    * Check if a service name is prohibited, meaning that we don't have to invoke
    39    * the callback for this specific service.
     52   * Check if a service name is prohibited, meaning that the Kernel doesn't have
     53   * to invoke the callback for this specific service.
    4054   *
     55   * @param conf the main configuration file maps
     56   * @param serviceName the serviceName
     57   * @return a bool true if the service is prohibited, false in other case
    4158   */
    4259  bool isProhibited(maps* conf,const char* serviceName){
     
    5269    }
    5370    return false;
     71  }
     72
     73  /**
     74   * Parameter definition to be used for sending parameters to a thread.
     75   */
     76  typedef struct {
     77    maps *conf;      //!< the main configuration file
     78    map *url;        //!< the callback url maps
     79    json_object *res;//!< the JSON object to post
     80    int step;        //!< the current step [0,6]
     81    int state;       //!< the current state [0,1]
     82  } local_params;
     83
     84  /**
     85   * Verify if the URL should use a shared cache or not.
     86   *
     87   * In case the security section contains a key named "shared", then if the
     88   * domain listed in the shared key are contained in the url given as parameter
     89   * then it return "SHARED" in other cases, it returns "OTHER".
     90   *
     91   * @param conf the main configuration file maps
     92   * @param url the URL to evaluate
     93   * @return a string "SHARED" in case the host is in a domain listed in the
     94   * shared key, "OTHER" in other cases.
     95   */
     96  char* getProvenance(maps* conf,const char* url){
     97    map* sharedCache=getMapFromMaps(conf,"security","shared");
     98    if(sharedCache!=NULL){
     99      char *res=NULL;
     100      char *hosts=sharedCache->value;
     101      char *curs=strtok(hosts,",");
     102      while(curs!=NULL){
     103        if(strstr(url,curs)==NULL)
     104          res="OTHER";
     105        else{
     106          res="SHARED";
     107          return res;
     108        }
     109      }
     110      return res;
     111    }
     112    return "OTHER";
     113  }
     114
     115  /**
     116   * Practically invoke the callback, meaning sending the HTTP POST request.
     117   *
     118   * @param args local_params containing all the variables required
     119   */
     120  void* _invokeCallback(void* args){
     121    local_params* arg=(local_params*)args;
     122    HINTERNET hInternet,res1;
     123    hInternet=InternetOpen("ZooWPSClient\0",
     124                           INTERNET_OPEN_TYPE_PRECONFIG,
     125                           NULL,NULL, 0);
     126    if(!CHECK_INET_HANDLE(hInternet)){
     127      InternetCloseHandle (&hInternet);
     128      return false;
     129    }
     130    char *URL=(char*)malloc((strlen(arg->url->value)+5)*sizeof(char));
     131    sprintf(URL,"%s%d_%d/",arg->url->value,arg->step,arg->state);
     132    const char* jsonStr=json_object_to_json_string_ext(arg->res,JSON_C_TO_STRING_PLAIN);
     133    hInternet.waitingRequests[0] = zStrdup(URL);
     134    free(URL);
     135    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
     136    fprintf(stderr," * JSON: [%s] \n",jsonStr);
     137    fprintf(stderr," * URL: %s/ \n\n",hInternet.waitingRequests[0]);
     138    fprintf(stderr,"************************* From thread %d %s %d: REQUEST PARAMETERS\n",pthread_self(),__FILE__,__LINE__);
     139    while( cStep!=7 &&
     140           ( cStep!=arg->step || (arg->state!=0 && steps[arg->step][0]==false) )
     141           ){
     142      sleep(1);
     143    }
     144    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n\n",pthread_self(),__FILE__,__LINE__);
     145    int i=0;
     146    for(i=0;i<7;i++){
     147      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
     148      fflush(stderr);
     149    }
     150    fprintf(stderr,"************************* From thread %d %s %d: REQUEST START\n",pthread_self(),__FILE__,__LINE__);
     151    fflush(stderr);
     152    res1 = InternetOpenUrl (&hInternet,
     153                            hInternet.waitingRequests[0],
     154                            (char*)jsonStr, strlen(jsonStr),
     155                            INTERNET_FLAG_NO_CACHE_WRITE,
     156                            0);
     157    AddHeaderEntries(&hInternet,arg->conf);
     158    //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);x
     159    processDownloads(&hInternet);
     160    fprintf(stderr,"************************* From thread %d %s %d: REQUEST END\n\n",pthread_self(),__FILE__,__LINE__);
     161    fflush(stderr);
     162    char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
     163                                 * sizeof (char));
     164    if (tmp == NULL)
     165      {
     166        setMapInMaps(arg->conf,"lenv","message",_("Unable to allocate memory"));
     167        setMapInMaps(arg->conf,"lenv","code","InternalError");
     168        return NULL;
     169      }
     170    size_t bRead;
     171    InternetReadFile (hInternet.ihandle[0],
     172                      (LPVOID) tmp,
     173                      hInternet.
     174                      ihandle[0].nDataLen,
     175                      &bRead);
     176    tmp[hInternet.ihandle[0].nDataLen] = 0;
     177    json_object_put(arg->res);
     178    InternetCloseHandle(&hInternet);
     179    if(cStep==0 || cStep==6 || arg->state==1)
     180      cStep=arg->step+1;
     181    steps[arg->step][arg->state]=true;
     182    fprintf(stderr,"************************* From thread %d %s %d: RESPONSE CONTENT\n",pthread_self(),__FILE__,__LINE__);
     183    for(i=0;i<7;i++){
     184      fprintf(stderr,"%d) %d %d\n",i,steps[i][0],steps[i][1]);
     185    }
     186    fprintf(stderr,"Result: \n%s\n\n",tmp);
     187    fprintf(stderr,"************************* From thread %d %s %d\n\n",pthread_self(),__FILE__,__LINE__);
     188    fflush(stderr);
     189    free(tmp);
     190    free(args);
     191    pthread_exit(NULL);
    54192  }
    55193 
     
    72210   * @return bool true in case of success, false in other cases
    73211   */
    74   bool invokeCallback(maps* m,maps* inputs,maps* outputs,int step,int state){
    75     map* url=getMapFromMaps(m,"callback","url");
     212  bool invokeCallback(maps* conf,maps* inputs,maps* outputs,int step,int state){
     213    map* url=getMapFromMaps(conf,"callback","url");
    76214    if(url==NULL)
    77215      return false;
    78216     
    79     maps* lenv=getMaps(m,"lenv");
     217    maps* lenv=getMaps(conf,"lenv");
    80218    map* sname=getMap(lenv->content,"identifier");
    81     if(sname!=NULL && isProhibited(m,sname->value))
     219    if(sname!=NULL && isProhibited(conf,sname->value))
    82220      return false;
    83221     
    84222    json_object *res=json_object_new_object();
    85223
    86     map* sid=getMapFromMaps(m,"lenv","usid");
     224    map* sid=getMapFromMaps(conf,"lenv","usid");
    87225    if(sid!=NULL){
    88226      json_object *jsStr=json_object_new_string(sid->value);
    89227      json_object_object_add(res,"jobid",jsStr);
    90228    }
     229    const struct tm *tm;
     230    size_t len;
     231    time_t now;
     232    char *tmp1;
     233    map *tmpStatus;
     234 
     235    now = time ( NULL );
     236    tm = localtime ( &now );
     237
     238    tmp1 = (char*)malloc((TIME_SIZE+1)*sizeof(char));
     239
     240    len = strftime ( tmp1, TIME_SIZE, "%Y-%m-%dT%I:%M:%SZ", tm );
     241    json_object *jsStr0=json_object_new_string(tmp1);
     242    json_object_object_add(res,"datetime",jsStr0);
     243
    91244    switch(step){
    92245    case 0: {
    93246      // Create a new analyze
    94       maps* lenv=getMaps(m,"lenv");
    95       sid=getMapFromMaps(m,"lenv","xrequest");
     247      maps* lenv=getMaps(conf,"lenv");
     248      sid=getMapFromMaps(conf,"lenv","xrequest");
    96249      if(sid!=NULL){
    97250        json_object *jsStr=json_object_new_string(sid->value);
    98251        json_object_object_add(res,"request_execute_content",jsStr);
    99252      }
    100       sid=getMapFromMaps(m,"lenv","identifier");
     253      sid=getMapFromMaps(conf,"lenv","identifier");
    101254      if(sid!=NULL){
    102255        json_object *jsStr=json_object_new_string(sid->value);
     
    108261      // Fetching data inputs
    109262      maps* curs=inputs;
    110      
    111263      char *keys[8][2]={
    112264        {
     
    139291        },
    140292        {
    141           "datatype",
     293          "geodatatype",
    142294          "datatype"
    143295        }       
     
    147299        map* tmpMap=getMap(curs->content,"cache_file");
    148300        sid=getMap(curs->content,"ref_wms_link");
     301        json_object *res2=json_object_new_object();
    149302        if(tmpMap!=NULL && sid==NULL){
    150303          addToMap(curs->content,"generated_file",tmpMap->value);
     304          struct stat buf;
     305          char timeStr[ 100 ] = "";
     306          if (stat(tmpMap->value, &buf)==0){
     307            strftime(timeStr, 100, "%d-%m-%Y %H:%M:%S", localtime( &buf.st_mtime));
     308            json_object *jsStr=json_object_new_string(timeStr);
     309            json_object_object_add(res2,"creation_date",jsStr);
     310          }
    151311          tmpMap=getMap(curs->content,"fmimeType");
    152312          if(tmpMap!=NULL){
    153313            addToMap(curs->content,"mimeType",tmpMap->value);
    154314          }
    155           setReferenceUrl(m,curs);
    156           //outputMapfile(m,curs);
     315          setReferenceUrl(conf,curs);
     316          //outputMapfile(conf,curs);
    157317          dumpMaps(curs);
    158318        }
    159         json_object *res2=json_object_new_object();
    160319        int i=0;
    161320        int hasRef=-1;
     
    165324            json_object *jsStr=json_object_new_string(sid->value);
    166325            json_object_object_add(res2,keys[i][1],jsStr);
    167             if(i==0)
     326            if(i==0){
    168327              hasRef=1;
     328              json_object *jsStr1=json_object_new_string(getProvenance(conf,url->value));
     329              json_object_object_add(res2,"dataOrigin",jsStr1);
     330            }
    169331          }
    170332        }
     
    176338      }
    177339      json_object_object_add(res,"inputs",res1);
    178       json_object* in=mapsToJson(inputs);
    179       if(in!=NULL){
    180         //json_object_object_add(res,"inputs",in);
    181         json_object_put(in);
    182       }
    183340      break;
    184341    }
    185342    case 2: {
    186343      // Uploading data input to cluster
    187       maps* in=getMaps(m,"uploadQueue");
     344      maps* in=getMaps(conf,"uploadQueue");
    188345      if(in!=NULL){
    189346        maps* curs=in;
     
    216373    case 3: {
    217374      // Generating job script
    218       sid=getMapFromMaps(m,"lenv","local_script");
     375      sid=getMapFromMaps(conf,"lenv","local_script");
    219376      if(sid!=NULL){
    220377        json_object *jsStr=json_object_new_string(sid->value);
     
    225382    case 4: {
    226383      // Submitting job to cluster
    227       sid=getMapFromMaps(m,"lenv","remote_script");
     384      sid=getMapFromMaps(conf,"lenv","remote_script");
    228385      if(sid!=NULL){
    229386        json_object *jsStr=json_object_new_string(sid->value);
     
    234391    case 5: {
    235392      // Downloading process outputs from cluster
    236       json_object* in=mapsToJson(outputs);
    237       if(in!=NULL){
    238         //json_object_object_add(res,"outputs",in);
    239         json_object_put(in);
    240       }
     393      //json_object* in=mapsToJson(outputs);
     394      dumpMaps(outputs);
     395      maps* curs=outputs;
     396      char *keys[8][2]={
     397        {
     398          "Reference",
     399          "ref"
     400        },
     401        {
     402          "storage",
     403          "cachefile"
     404        },
     405        {
     406          "fmimeType",
     407          "mimetype"
     408        },
     409        {
     410          "size",
     411          "size"
     412        },
     413        {
     414          "ref_wms_link",
     415          "ref_wms_link"
     416        },
     417        {
     418          "ref_wcs_link",
     419          "ref_wcs_link"
     420        },
     421        {
     422          "ref_wfs_link",
     423          "ref_wfs_link"
     424        },
     425        {
     426          "geodatatype",
     427          "datatype"
     428        }       
     429      };
     430      json_object *res1=json_object_new_object();
     431      while(curs!=NULL){       
     432        map* tmpMap=getMap(curs->content,"cache_file");
     433        sid=getMap(curs->content,"ref_wms_link");
     434        json_object *res2=json_object_new_object();
     435        int i=0;
     436        int hasRef=-1;
     437        for(;i<8;i++){
     438          sid=getMap(curs->content,keys[i][0]);
     439          if(sid!=NULL){
     440            json_object *jsStr=json_object_new_string(sid->value);
     441            json_object_object_add(res2,keys[i][1],jsStr);
     442            if(i==0)
     443              hasRef=1;
     444          }
     445        }
     446        if(hasRef>0)
     447          json_object_object_add(res1,curs->name,res2);
     448        else{
     449          maps* curs0=curs->child;
     450          while(curs0!=NULL){
     451            json_object *res3=json_object_new_object();
     452            int i0=0;
     453            int hasRef0=-1;
     454            for(;i0<8;i0++){
     455              sid=getMap(curs0->content,keys[i0][0]);
     456              if(sid!=NULL){
     457                json_object *jsStr=json_object_new_string(sid->value);
     458                json_object_object_add(res3,keys[i0][1],jsStr);
     459                //if(i0==0)
     460                hasRef0=1;
     461              }
     462            }
     463            if(hasRef0<0)
     464              json_object_put(res3);
     465            else
     466              json_object_object_add(res2,curs0->name,res3);
     467            curs0=curs0->next;
     468          }       
     469          json_object_object_add(res1,curs->name,res2);
     470        }
     471        curs=curs->next;
     472      }
     473      json_object_object_add(res,"outputs",res1);
    241474      break;
    242475    }
    243476    case 6: {
    244477      // Finalize HPC
    245       sid=getMapFromMaps(m,"lenv","local_script");
     478      sid=getMapFromMaps(conf,"lenv","local_script");
    246479      if(sid!=NULL){
    247480        json_object *jsStr=json_object_new_string(sid->value);
     
    252485    case 7: {
    253486      // Error or Dismiss
    254       sid=getMapFromMaps(m,"lenv","message");
     487      sid=getMapFromMaps(conf,"lenv","message");
    255488      if(sid!=NULL){
    256489        json_object *jsStr=json_object_new_string(sid->value);
     
    265498      }
    266499    }
    267     fprintf(stderr,"************************* %s %d\n\n",__FILE__,__LINE__);
    268     fflush(stderr);
    269     HINTERNET hInternet,res1;
    270     hInternet=InternetOpen("ZooWPSClient\0",
    271                            INTERNET_OPEN_TYPE_PRECONFIG,
    272                            NULL,NULL, 0);
    273     if(!CHECK_INET_HANDLE(hInternet)){
    274       InternetCloseHandle (&hInternet);
     500   
     501    local_params* argumentsA=(local_params*)malloc(MAPS_SIZE+MAP_SIZE+sizeof(json_object*)+(2*sizeof(int)));
     502    argumentsA->conf=conf;
     503    argumentsA->url=url;
     504    argumentsA->res=res;
     505    argumentsA->step=step;
     506    argumentsA->state=state;
     507    //pthread_t p1;
     508    if(myThreads==NULL)
     509      myThreads=(pthread_t*)malloc((nbThreads+1)*sizeof(pthread_t));
     510    else
     511      myThreads=(pthread_t*)realloc(myThreads,(nbThreads+1)*sizeof(pthread_t));
     512    if(pthread_create(&myThreads[nbThreads], NULL, _invokeCallback, (void*)argumentsA)==-1){
     513      setMapInMaps(conf,"lenv","message",_("Unable to create a new thread"));
    275514      return false;
    276515    }
    277     fprintf(stderr," * JSON: [%s] \n",json_object_to_json_string_ext(res,JSON_C_TO_STRING_PLAIN));
    278     fprintf(stderr," * URL: %s%d_%d/ \n\n",url->value,step,state);
    279     fflush(stderr);
    280     char *URL=(char*)malloc((strlen(url->value)+5)*sizeof(char));
    281     sprintf(URL,"%s%d_%d/",url->value,step,state);
    282     hInternet.waitingRequests[0] = zStrdup(URL);
    283     free(URL);
    284     const char* jsonStr=json_object_to_json_string_ext(res,JSON_C_TO_STRING_PLAIN);
    285     res1 = InternetOpenUrl (&hInternet,
    286                      hInternet.waitingRequests[0],
    287                      (char*)jsonStr, strlen(jsonStr),
    288                      INTERNET_FLAG_NO_CACHE_WRITE,
    289                      0);
    290     AddHeaderEntries(&hInternet,m);
    291     //curl_easy_setopt(hInternet.ihandle[hInternet.nb].handle, CURLOPT_VERBOSE, 1);
    292     processDownloads(&hInternet);
    293     char *tmp = (char *) malloc ((hInternet.ihandle[0].nDataLen + 1)
    294                                  * sizeof (char));
    295     if (tmp == NULL)
    296       {
    297         setMapInMaps(m,"lenv","message",_("Unable to allocate memory"));
    298         setMapInMaps(m,"lenv","code","InternalError");
    299         return false;
    300       }
    301     size_t bRead;
    302     InternetReadFile (hInternet.ihandle[0],
    303                       (LPVOID) tmp,
    304                       hInternet.
    305                       ihandle[0].nDataLen,
    306                       &bRead);
    307     tmp[hInternet.ihandle[0].nDataLen] = 0;
    308     fprintf(stderr,"Result: \n%s\n\n",tmp);
    309     fprintf(stderr,"************************* %s %d\n\n",__FILE__,__LINE__);
    310     fflush(stderr);
    311     free(tmp);
    312     json_object_put(res);
    313     InternetCloseHandle(&hInternet);
     516    //free(argumentsA);
     517    nbThreads++;
    314518    return true;
     519  }
     520
     521  void cleanupCallbackThreads(){
     522    int i=0;
     523    for(i=0;i<nbThreads;i++){
     524      pthread_join(myThreads[i],NULL);
     525    }
     526    free(myThreads);
    315527  }
    316528
Note: See TracChangeset for help on using the changeset viewer.

Search

Context Navigation

ZOO Sponsors

http://www.zoo-project.org/trac/chrome/site/img/geolabs-logo.pnghttp://www.zoo-project.org/trac/chrome/site/img/neogeo-logo.png http://www.zoo-project.org/trac/chrome/site/img/apptech-logo.png http://www.zoo-project.org/trac/chrome/site/img/3liz-logo.png http://www.zoo-project.org/trac/chrome/site/img/gateway-logo.png

Become a sponsor !

Knowledge partners

http://www.zoo-project.org/trac/chrome/site/img/ocu-logo.png http://www.zoo-project.org/trac/chrome/site/img/gucas-logo.png http://www.zoo-project.org/trac/chrome/site/img/polimi-logo.png http://www.zoo-project.org/trac/chrome/site/img/fem-logo.png http://www.zoo-project.org/trac/chrome/site/img/supsi-logo.png http://www.zoo-project.org/trac/chrome/site/img/cumtb-logo.png

Become a knowledge partner

Related links

http://zoo-project.org/img/ogclogo.png http://zoo-project.org/img/osgeologo.png