- Timestamp:
- Dec 12, 2017, 4:09:47 PM (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/prototype-v0/zoo-project/zoo-kernel/service_internal_hpc.c
r854 r860 285 285 * @param real_inputs the maps containing the inputs 286 286 * @param real_outputs the maps containing the outputs 287 * @return SERVICE_SUCCEEDED in case of success, -1 or SERVICE_FAILED when failing. 287 288 */ 288 289 int zoo_hpc_support(maps** main_conf,map* request,service* s,maps **real_inputs,maps **real_outputs){ … … 391 392 } 392 393 393 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 394 invokeCallback(m,inputs,NULL,2,0);395 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 396 dumpMaps(inputs);394 #ifdef HPC_DEBUG 395 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 396 #endif 397 invokeCallback(m,inputs,NULL,1,1); 397 398 if(getMapFromMaps(m,"lenv","mapError")!=NULL){ 398 399 invokeCallback(m,inputs,NULL,7,0); 399 400 return -1; 400 401 } 402 invokeCallback(m,inputs,NULL,2,0); 403 #ifdef HPC_DEBUG 404 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 405 dumpMaps(inputs); 406 #endif 401 407 402 408 // Upload data on HPC … … 404 410 errorException (m, _("Unable to lock the file for upload!"), 405 411 "InternalError", NULL); 406 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 412 #ifdef HPC_DEBUG 413 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 414 #endif 407 415 invokeCallback(m,inputs,NULL,7,0); 408 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 416 #ifdef HPC_DEBUG 417 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 418 #endif 409 419 return -1; 410 420 } 411 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 421 #ifdef HPC_DEBUG 422 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 423 #endif 412 424 invokeCallback(m,inputs,NULL,2,1); 413 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 425 #ifdef HPC_DEBUG 426 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 427 #endif 414 428 415 429 // Add the filename to generate for every output to parameters … … 417 431 // TODO: fix appendOutputParameters 418 432 //appendOutputParameters(input,parameters,¶meters_cnt,s,uuid,targetPathMap); 433 #ifdef HPC_DEBUG 419 434 dumpMaps(input); 435 #endif 420 436 while(input!=NULL){ 421 437 // TODO: parse all outputs including inner outputs if required. … … 509 525 sprintf(scriptPath,"%s/zoo_%s_%s.sh",tmpPath->value,s->name,uuid->value); 510 526 setMapInMaps(*main_conf,"lenv","local_script",scriptPath); 527 #ifdef HPC_DEBUG 511 528 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 512 529 fflush(stderr); 530 #endif 513 531 invokeCallback(m,inputs,NULL,3,0); 532 #ifdef HPC_DEBUG 514 533 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 515 534 fflush(stderr); 535 #endif 516 536 FILE* scriptFile=fopen(scriptPath,"w+"); 517 537 map* headerMap=getMapFromMaps(*main_conf,configurationId,"jobscript_header"); … … 562 582 free(fcontent); 563 583 }else 564 fprintf(scriptFile," #!/bin/bash\n\n### *** Default ZOO-Service BODY (no body found) *** ###\n\n");584 fprintf(scriptFile,"\n### *** Default ZOO-Service BODY (no body found) *** ###\n\n"); 565 585 }else 566 fprintf(scriptFile," #!/bin/bash\n\n### *** Default ZOO-Service BODY *** ###\n\n");586 fprintf(scriptFile,"\n### *** Default ZOO-Service BODY *** ###\n\n"); 567 587 568 588 map* sp=getMap(s->content,"serviceProvider"); … … 583 603 fflush(scriptFile); 584 604 fclose(scriptFile); 585 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 605 #ifdef HPC_DEBUG 606 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 607 #endif 586 608 invokeCallback(m,inputs,NULL,3,1); 587 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 609 #ifdef HPC_DEBUG 610 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 611 #endif 588 612 589 613 // Upload the SBATCH File to the remote host 590 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 614 #ifdef HPC_DEBUG 615 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 616 #endif 591 617 invokeCallback(m,inputs,NULL,4,0); 592 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 618 #ifdef HPC_DEBUG 619 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 620 #endif 593 621 targetPathMap=getMapFromMaps(*main_conf,configurationId,"remote_work_path"); 594 622 if(targetPathMap==NULL){ … … 597 625 errorException (m, _("There is no remote_work_path defined in your section!"), 598 626 "InternalError", NULL); 627 #ifdef HPC_DEBUG 599 628 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 600 629 fflush(stderr); 630 #endif 601 631 invokeCallback(m,NULL,NULL,7,0); 632 #ifdef HPC_DEBUG 602 633 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 603 634 fflush(stderr); 635 #endif 604 636 return SERVICE_FAILED; 605 637 } … … 609 641 setMapInMaps(*main_conf,"lenv","remote_script",targetPath); 610 642 SSHCON *test=ssh_connect(*main_conf); 611 ssh_copy(*main_conf,scriptPath,targetPath,ssh_get_cnt(*main_conf));643 int copy0=ssh_copy(*main_conf,scriptPath,targetPath,ssh_get_cnt(*main_conf)); 612 644 unlink(scriptPath); 613 645 free(scriptPath); 646 if(copy0!=true){ 647 setMapInMaps(m,"lenv","message",_("Unable to upload the script")); 648 invokeCallback(m,NULL,NULL,7,0); 649 errorException(m,_("Unable to upload the script"),"NoApplicableCode",NULL); 650 return -1; 651 } 614 652 // Execute the SBATCH script remotely 615 653 addReadLocks(main_conf); … … 617 655 char *command=(char*)malloc((strlen(targetPath)+strlen(targetPathMap->value)+strlen(subStr->value)+strlen(uuid->value)+137)*sizeof(char)); 618 656 sprintf(command,"sbatch %s 2> %s/error_%s.log | sed \"s:%s::g\"",targetPath,targetPathMap->value,uuid->value,subStr->value); 619 if(ssh_exec(*main_conf,command,ssh_get_cnt(m)) ==0){657 if(ssh_exec(*main_conf,command,ssh_get_cnt(m))<=0){ 620 658 // The sbatch command has failed! 621 659 // Download the error log file from the HPC server … … 645 683 }else 646 684 setMapInMaps(*main_conf,"lenv","message",_("Unable to fetch the remote error log file")); 647 tmpPath=getMapFromMaps(*main_conf,"lenv","message"); 685 tmpPath=getMapFromMaps(m,"lenv","message"); 686 #ifdef HPC_DEBUG 648 687 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 649 688 fflush(stderr); 689 #endif 650 690 invokeCallback(m,NULL,NULL,7,0); 691 #ifdef HPC_DEBUG 651 692 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 652 693 fflush(stderr); 653 sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s: %s", s->name, tmpPath->value); 694 #endif 695 sprintf(tmpS, "Cannot execute the HPC ZOO-Service %s using %s: %s", s->name, configurationId, tmpPath->value); 654 696 errorException(m,tmpS,"NoApplicableCode",NULL); 655 697 free(command); 656 698 free(targetPath); 657 699 ssh_close(*main_conf); 700 removeReadLocks(main_conf); 658 701 sleep(1); 659 702 return -1; 660 703 } 661 704 free(targetPath); 705 #ifdef HPC_DEBUG 662 706 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 663 707 fflush(stderr); 708 #endif 664 709 invokeCallback(m,NULL,NULL,4,1); 710 #ifdef HPC_DEBUG 665 711 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 666 712 fflush(stderr); 713 #endif 667 714 free(command); 715 #ifdef HPC_DEBUG 668 716 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 669 717 fflush(stderr); 718 #endif 670 719 671 720 struct sockaddr_un addr; 672 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);673 fflush(stderr);674 721 memset(&addr, 0, sizeof(addr)); 675 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);676 fflush(stderr);677 722 addr.sun_family = AF_UNIX; 678 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);679 fflush(stderr);680 723 int rc, cl, fd = socket(AF_UNIX, SOCK_STREAM, 0); 681 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);682 fflush(stderr);683 724 char *sname=(char*)malloc((strlen(tmpPath->value)+strlen(uuid->value)+20)); 684 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);685 fflush(stderr);686 725 sprintf(sname,"%s/.wait_socket_%s.sock",tmpPath->value,uuid->value); 687 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);688 fflush(stderr);689 726 strncpy(addr.sun_path, sname, sizeof(addr.sun_path)-1); 690 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);691 fflush(stderr);692 727 693 728 if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { … … 696 731 errorException (m, _("Unable to bind socket!"), 697 732 "InternalError", NULL); 733 #ifdef HPC_DEBUG 698 734 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 699 735 fflush(stderr); 736 #endif 700 737 invokeCallback(m,NULL,NULL,7,0); 738 removeReadLocks(main_conf); 739 #ifdef HPC_DEBUG 701 740 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 702 741 fflush(stderr); 703 sleep(120); 742 #endif 704 743 return -1; 705 744 } 745 #ifdef HPC_DEBUG 706 746 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 707 747 fflush(stderr); 748 #endif 708 749 if (listen(fd, 5) == -1) { 709 750 setMapInMaps(*main_conf,"lenv","message",_("Listen error")); 710 751 errorException (m, _("Listen error"), 711 752 "InternalError", NULL); 753 #ifdef HPC_DEBUG 712 754 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 713 755 fflush(stderr); 756 #endif 714 757 invokeCallback(m,NULL,NULL,7,0); 758 removeReadLocks(main_conf); 759 #ifdef HPC_DEBUG 715 760 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 716 761 fflush(stderr); 762 #endif 717 763 return -1; 718 764 } 719 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);720 fflush(stderr);721 765 if ( (cl = accept(fd, NULL, NULL)) == -1) { 722 766 setMapInMaps(*main_conf,"lenv","message",_("Accept error")); 723 767 errorException (m, _("Accept error"), 724 768 "InternalError", NULL); 769 #ifdef HPC_DEBUG 725 770 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 726 771 fflush(stderr); 772 #endif 727 773 invokeCallback(m,NULL,NULL,7,0); 774 removeReadLocks(main_conf); 775 #ifdef HPC_DEBUG 728 776 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 729 777 fflush(stderr); 778 #endif 730 779 return -1; 731 780 }else{ 781 #ifdef HPC_DEBUG 732 782 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 733 783 fflush(stderr); 784 #endif 734 785 int hasPassed=-1; 735 786 char buf[11]; … … 741 792 errorException (m, _("Read closed"), 742 793 "InternalError", NULL); 794 #ifdef HPC_DEBUG 743 795 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 744 796 fflush(stderr); 797 #endif 745 798 invokeCallback(m,NULL,NULL,7,0); 799 removeReadLocks(main_conf); 800 #ifdef HPC_DEBUG 746 801 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 747 802 fflush(stderr); 803 #endif 748 804 return -1; 749 805 }else{ … … 752 808 errorException (m, _("Read error"), 753 809 "InternalError", NULL); 810 #ifdef HPC_DEBUG 754 811 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 755 812 fflush(stderr); 813 #endif 756 814 invokeCallback(m,NULL,NULL,7,0); 815 removeReadLocks(main_conf); 816 #ifdef HPC_DEBUG 757 817 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 758 818 fflush(stderr); 819 #endif 759 820 return -1; 760 821 } 761 822 } 762 823 hasPassed=1; 763 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);764 fflush(stderr);765 824 res=atoi(buf); 766 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);767 fflush(stderr);768 825 unlink(sname); 769 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);770 fflush(stderr);771 826 free(sname); 772 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);773 fflush(stderr);774 827 removeReadLocks(main_conf); 775 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__);776 fflush(stderr);777 828 778 829 if(res==3){ 830 #ifdef HPC_DEBUG 779 831 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 780 832 fflush(stderr); 833 #endif 781 834 invokeCallback(m,NULL,outputs,5,0); 835 #ifdef HPC_DEBUG 782 836 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 783 837 fflush(stderr); 838 #endif 839 840 // Read informations provided by FinalizeHPC as a configuration file 841 // then, remove the file. 842 map* jobid=getMapFromMaps(*main_conf,"lenv","usid"); 843 map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath"); 844 char *filePath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+15)*sizeof(char)); 845 sprintf(filePath,"%s/exec_status_%s",tmpPath->value,jobid->value); 846 maps* m = (maps *) malloc (MAPS_SIZE); 847 m->child=NULL; 848 m->next=NULL; 849 int saved_stdout = dup (fileno (stdout)); 850 dup2 (fileno (stderr), fileno (stdout)); 851 conf_read(filePath,m); 852 fflush(stdout); 853 dup2 (saved_stdout, fileno (stdout)); 854 close(saved_stdout); 855 unlink(filePath); 856 free(filePath); 857 addMapsToMaps(main_conf,m); 858 freeMaps(&m); 859 free(m); 860 784 861 input=*real_outputs; 785 862 while(input!=NULL){ … … 797 874 char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char)); 798 875 sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename); 799 setMapInMaps( *main_conf,"lenv","message",tmpStr);876 setMapInMaps(m,"lenv","message",tmpStr); 800 877 free(tmpStr); 801 878 invokeCallback(m,NULL,NULL,7,0); … … 804 881 } 805 882 }else{ 806 fprintf(stderr,"%s %d\n",__FILE__,__LINE__);807 fflush(stderr);808 883 map* generatedFile=getMap(input->content,"generated_file"); 809 884 if(generatedFile!=NULL){ … … 834 909 sprintf(serviceName,"wcs_link"); 835 910 setMapInMaps(output->child,serviceName,"msOgc","WCS"); 911 setMapInMaps(output->child,serviceName,"requestedMimeType","image/tiff"); 836 912 } 837 913 else{ 838 914 sprintf(serviceName,"wfs_link"); 839 915 setMapInMaps(output->child,serviceName,"msOgc","WFS"); 916 setMapInMaps(output->child,serviceName,"requestedMimeType","text/xml"); 840 917 } 841 918 setMapInMaps(output->child,serviceName,"storage",targetPath); … … 844 921 setMapInMaps(output->child,serviceName,"asReference","true"); 845 922 }else{ 846 char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char)); 847 sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename); 848 setMapInMaps(*main_conf,"lenv","message",tmpStr); 849 free(tmpStr); 850 invokeCallback(m,NULL,NULL,7,0); 923 map* hpcStdErr=getMapFromMaps(*main_conf,"henv","StdErr"); 924 if(hpcStdErr!=NULL && ssh_fetch(*main_conf,targetPath,hpcStdErr->value,ssh_get_cnt(m))==0){ 925 struct stat f_status; 926 int ts=stat(targetPath, &f_status); 927 if(ts==0) { 928 char* fcontent = NULL; 929 fcontent=(char*)malloc(sizeof(char)*(f_status.st_size+1)); 930 FILE* f=fopen(targetPath,"rb"); 931 fread(fcontent,f_status.st_size,1,f); 932 int fsize=f_status.st_size; 933 fcontent[fsize]=0; 934 fclose(f); 935 setMapInMaps(*main_conf,"lenv","message",fcontent); 936 free(fcontent); 937 }else{ 938 char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char)); 939 sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename); 940 setMapInMaps(*main_conf,"lenv","message",tmpStr); 941 free(tmpStr); 942 } 943 }else{ 944 char *tmpStr=(char*)malloc((strlen(filename)+strlen(_("Unable to fetch the remote file for %s"))+1)*sizeof(char)); 945 sprintf(tmpStr,_("Unable to fetch the remote file for %s"),filename); 946 setMapInMaps(*main_conf,"lenv","message",tmpStr); 947 free(tmpStr); 948 } 949 invokeCallback(*main_conf,NULL,NULL,7,0); 851 950 return SERVICE_FAILED; 852 951 } … … 857 956 } 858 957 859 // Read informations provided by FinalizeHPC as a configuration file860 // then, remove the file.861 map* jobid=getMapFromMaps(*main_conf,"lenv","usid");862 map* tmpPath=getMapFromMaps(*main_conf,"main","tmpPath");863 char *filePath=(char*)malloc((strlen(tmpPath->value)+strlen(jobid->value)+15)*sizeof(char));864 sprintf(filePath,"%s/exec_status_%s",tmpPath->value,jobid->value);865 maps* m = (maps *) malloc (MAPS_SIZE);866 m->child=NULL;867 m->next=NULL;868 int saved_stdout = dup (fileno (stdout));869 dup2 (fileno (stderr), fileno (stdout));870 conf_read(filePath,m);871 fflush(stdout);872 dup2 (saved_stdout, fileno (stdout));873 close(saved_stdout);874 unlink(filePath);875 free(filePath);876 addMapsToMaps(main_conf,m);877 freeMaps(&m);878 free(m);879 958 }else{ 880 959 // Try to access remotely to the log file and return a more relevant error message … … 882 961 errorException (m, _("HPC Execution failed!"), 883 962 "InternalError", NULL); 963 #ifdef HPC_DEBUG 884 964 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 885 965 fflush(stderr); 966 #endif 886 967 invokeCallback(m,NULL,NULL,7,0); 968 #ifdef HPC_DEBUG 887 969 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 888 970 fflush(stderr); 971 #endif 889 972 } 890 973 //free(buf); … … 895 978 errorException (m, _("Unable to parse the value returned by remote execution"), 896 979 "InternalError", NULL); 980 #ifdef HPC_DEBUG 897 981 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 898 982 fflush(stderr); 983 #endif 899 984 invokeCallback(m,NULL,NULL,7,0); 985 #ifdef HPC_DEBUG 900 986 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 901 987 fflush(stderr); 902 sleep(120); 988 #endif 903 989 return SERVICE_FAILED; 904 990 } 905 991 } 992 #ifdef HPC_DEBUG 906 993 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 907 994 fflush(stderr); 995 #endif 908 996 ssh_close(*main_conf); 997 #ifdef HPC_DEBUG 909 998 fprintf(stderr,"************************* %s %d \n\n",__FILE__,__LINE__); 910 999 fflush(stderr); 1000 #endif 911 1001 return res; 912 1002 }
Note: See TracChangeset
for help on using the changeset viewer.