Sleipnir
tools/SeekServer/SeekServer.cpp
00001 /*****************************************************************************
00002 * This file is provided under the Creative Commons Attribution 3.0 license.
00003 *
00004 * You are free to share, copy, distribute, transmit, or adapt this work
00005 * PROVIDED THAT you attribute the work to the authors listed below.
00006 * For more information, please see the following web page:
00007 * http://creativecommons.org/licenses/by/3.0/
00008 *
00009 * This file is a component of the Sleipnir library for functional genomics,
00010 * authored by:
00011 * Curtis Huttenhower (chuttenh@princeton.edu)
00012 * Mark Schroeder
00013 * Maria D. Chikina
00014 * Olga G. Troyanskaya (ogt@princeton.edu, primary contact)
00015 *
00016 * If you use this library, the included executable tools, or any related
00017 * code in your work, please cite the following publication:
00018 * Curtis Huttenhower, Mark Schroeder, Maria D. Chikina, and
00019 * Olga G. Troyanskaya.
00020 * "The Sleipnir library for computational functional genomics"
00021 *****************************************************************************/
00022 #include "stdafx.h"
00023 #include "cmdline.h"
00024 
00025 #define BACKLOG 10   // how many pending connections queue will hold
00026 char *PORT;
00027 int NUM_DSET_MEMORY = 50;
00028 CPCL **pcl;
00029 CSeekCentral *csfinal;
00030 char *available;
00031 map<string, int> DNAME_MAP;
00032 map<int, string> DNAME_RMAP;
00033 
00034 pthread_mutex_t mutexGet;
00035 
00036 void sigchld_handler(int s){
00037     while(waitpid(-1, NULL, WNOHANG) > 0);
00038 }
00039 
00040 // get sockaddr, IPv4 or IPv6:
00041 void *get_in_addr(struct sockaddr *sa){
00042     if (sa->sa_family == AF_INET) {
00043         return &(((struct sockaddr_in*)sa)->sin_addr);
00044     }
00045     return &(((struct sockaddr_in6*)sa)->sin6_addr);
00046 }
00047 
00048 #define NUM_THREADS 8
00049 char THREAD_OCCUPIED[NUM_THREADS];
00050 
00051 struct thread_data{
00052     //CSeekCentral *csf;
00053     string strQuery;
00054     string strOutputDir;
00055     string strSearchDatasets;
00056 
00057     float rbp_p;
00058     float query_fraction_required;
00059     float genome_fraction_required;
00060     string distanceMeasure; //Correlation, Zscore, or ZscoreHubbinessCorrected
00061     string searchMethod; //RBP, EqualWeighting, or OrderStatistics
00062 
00063     int threadid;
00064     int new_fd;
00065 };
00066 
00067 void *do_query(void *th_arg){
00068     struct thread_data *my = (struct thread_data *) th_arg;
00069     int new_fd = my->new_fd;
00070     int threadid = my->threadid;
00071     //CSeekCentral *csf = my->csf;
00072     string strQuery = my->strQuery;
00073     string strOutputDir = my->strOutputDir;
00074     string strSearchDatasets = my->strSearchDatasets;
00075     string distanceMeasure = my->distanceMeasure;
00076     string searchMethod = my->searchMethod;
00077     float rbp_p = my->rbp_p;
00078     float query_fraction_required = my->query_fraction_required;
00079     float genome_fraction_required = my->genome_fraction_required;
00080 
00081     CSeekCentral *csu = new CSeekCentral();
00082     enum CSeekDataset::DistanceMeasure eDM = CSeekDataset::Z_SCORE;
00083     bool bSubtractGeneAvg = false;
00084     bool bNormPlatform = false;
00085 
00086     if(distanceMeasure=="Correlation"){
00087         eDM = CSeekDataset::CORRELATION;
00088     }else if(distanceMeasure=="Zscore"){
00089         //do nothing
00090     }else if(distanceMeasure=="ZscoreHubbinessCorrected"){
00091         bSubtractGeneAvg = true;
00092         bNormPlatform = true;
00093         //bNormPlatform = false;
00094     }
00095 
00096     //fprintf(stderr, "%s\n%s\n%s\n%.2f\n", strOutputDir.c_str(), strQuery.c_str(), strSearchDatasets.c_str(), query_fraction_required);
00097 
00098     bool r = csu->Initialize(strOutputDir, strQuery, strSearchDatasets, csfinal,
00099         new_fd, query_fraction_required, genome_fraction_required, eDM, bSubtractGeneAvg,
00100         bNormPlatform);
00101 
00102     //if r is false, then one of the query has no datasets 
00103     //containing any of the query (because of CheckDatasets() in Initialize()),
00104     //exit in this case
00105     if(r){
00106         if(searchMethod=="EqualWeighting"){
00107             csu->EqualWeightSearch();
00108         }else if(searchMethod=="OrderStatistics"){
00109             csu->OrderStatistics();
00110         }else{
00111             const gsl_rng_type *T;
00112             gsl_rng *rnd;
00113             gsl_rng_env_setup();
00114             T = gsl_rng_default;
00115             rnd = gsl_rng_alloc(T);
00116             gsl_rng_set(rnd, 100);
00117             utype FOLD = 5;
00118             //enum PartitionMode PART_M = CUSTOM_PARTITION;
00119             enum CSeekQuery::PartitionMode PART_M = CSeekQuery::LEAVE_ONE_IN;
00120             csu->CVSearch(rnd, PART_M, FOLD, rbp_p);
00121             gsl_rng_free(rnd);
00122         }
00123     }
00124 
00125     csu->Destruct();
00126     delete csu;
00127 
00128     //fprintf(stderr, "Done search\n"); system("date +%s%N 1>&2");
00129 
00130     /*char *sm = (char*)malloc(12);
00131     sprintf(sm, "Done search");
00132     sm[11] = '\0';
00133     send_msg(new_fd, sm, 12);
00134     free(sm);
00135     */
00136     //Sending back to client, still need to be completed===============
00137     //pthread_mutex_lock(&mutexGet);
00138     //THREAD_OCCUPIED[threadid] = 0;
00139     //close(new_fd);
00140     //pthread_mutex_lock(&mutexGet);
00141 
00142     pthread_mutex_lock(&mutexGet);
00143     close(new_fd);
00144     THREAD_OCCUPIED[threadid] = 0;
00145     pthread_mutex_unlock(&mutexGet);
00146 
00147     int ret = 0;
00148     pthread_exit((void*)ret);
00149 }
00150 
00151 int main( int iArgs, char** aszArgs ) {
00152     static const size_t c_iBuffer   = 1024;
00153 #ifdef WIN32
00154     pthread_win32_process_attach_np( );
00155 #endif // WIN32
00156     gengetopt_args_info sArgs;
00157     int lineSize = 1024;
00158 
00159     if( cmdline_parser( iArgs, aszArgs, &sArgs ) ) {
00160         cmdline_parser_print_help( );
00161         return 1; }
00162 
00163     bool useNibble = false;
00164 
00165     if(sArgs.is_nibble_flag==1){
00166         fprintf(stderr, "Nibble integration is not supported! Please use a non-nibble CDatabase.\n");
00167         useNibble = true;
00168         return 1;
00169     }
00170 
00171     // Random Number Generator Initializations
00172     utype i, j;
00173     bool bOutputWeightComponent = true;
00174     bool bSimulateWeight = true;
00175     bool bSubtractAvg = false;
00176     bool bNormPlatform = false;
00177     bool bLogit = false;
00178 
00179     csfinal = new CSeekCentral();
00180     CSeekDBSetting *dbSetting = new CSeekDBSetting(sArgs.dir_gvar_arg,
00181         sArgs.dir_sinfo_arg, sArgs.dir_platform_arg, sArgs.dir_prep_in_arg,
00182         sArgs.dir_in_arg, sArgs.input_arg, sArgs.quant_arg, sArgs.dset_arg,
00183         sArgs.num_db_arg);
00184     vector<CSeekDBSetting*> cc;
00185     cc.push_back(dbSetting);
00186 
00187     string add_db = sArgs.additional_db_arg;
00188     if(add_db!="NA"){
00189         ifstream ifsm;
00190         ifsm.open(add_db.c_str());
00191         if(!ifsm.is_open()){
00192             fprintf(stderr, "Error opening file %s\n", add_db.c_str());
00193             return false;
00194         }
00195         char acBuffer[lineSize];
00196         utype c_iBuffer = lineSize;
00197         vector<map<string,string> > parameters; //an array of CDatabase's
00198         while(!ifsm.eof()){
00199             ifsm.getline(acBuffer, c_iBuffer-1);
00200             if(acBuffer[0]==0) break;
00201             acBuffer[c_iBuffer-1]=0;
00202             string strB = acBuffer;
00203             if(strB=="START"){
00204                 map<string,string> p;
00205                 while(!ifsm.eof()){
00206                     ifsm.getline(acBuffer, c_iBuffer-1);
00207                     if(acBuffer[0]==0){
00208                         fprintf(stderr, "Invalid line (empty)\n");
00209                         return 1;
00210                     }
00211                     strB = acBuffer;
00212                     if(strB=="END") break;
00213                     vector<string> tok;
00214                     CMeta::Tokenize(acBuffer, tok); //separator is tab
00215                     p[tok[0]] = tok[1];
00216                 }
00217                 parameters.push_back(p);
00218             }
00219         }
00220         ifsm.close();
00221         if(parameters.size()==0){
00222             fprintf(stderr, "Error, extra_db setting file must begin with START and end with END lines\n");
00223             return 1;
00224         }
00225 
00226         for(i=0; i<parameters.size(); i++){
00227         string sinfo_dir = "NA";
00228         string gvar_dir = "NA";
00229         string platform_dir = "NA";
00230         string prep_dir = "NA";
00231         string db_dir = "NA";
00232         string dset_map_file = "NA";
00233         string gene_map_file = "NA";
00234         string quant_file = "NA";
00235         int num_db = -1;
00236 
00237         if(parameters[i].find("SINFO_DIR")->second=="NA"){
00238             fprintf(stderr, "Please specify an sinfo directory for the extra db\n");
00239             return false;
00240         }
00241         sinfo_dir = parameters[i].find("SINFO_DIR")->second;
00242         if(parameters[i].find("GVAR_DIR")!=parameters[i].end())
00243             gvar_dir = parameters[i].find("GVAR_DIR")->second;
00244         if(parameters[i].find("PREP_DIR")==parameters[i].end() ||
00245             parameters[i].find("PLATFORM_DIR")==parameters[i].end() ||
00246             parameters[i].find("DB_DIR")==parameters[i].end() ||
00247             parameters[i].find("DSET_MAP_FILE")==parameters[i].end() ||
00248             parameters[i].find("GENE_MAP_FILE")==parameters[i].end() ||
00249             parameters[i].find("QUANT_FILE")==parameters[i].end() ||
00250             parameters[i].find("NUMBER_OF_DB")==parameters[i].end()){
00251             fprintf(stderr, "Some arguments are missing. Please make sure the following are provided:\n");
00252             fprintf(stderr, "PREP_DIR, DB_DIR, DSET_MAP_FILE, GENE_MAP_FILE, QUANT_FILE, NUMBER_OF_DB\n");
00253             return false;
00254         }
00255 
00256         platform_dir = parameters[i].find("PLATFORM_DIR")->second;
00257         db_dir = parameters[i].find("DB_DIR")->second;
00258         prep_dir = parameters[i].find("PREP_DIR")->second;
00259         dset_map_file = parameters[i].find("DSET_MAP_FILE")->second;
00260         gene_map_file = parameters[i].find("GENE_MAP_FILE")->second;
00261         quant_file = parameters[i].find("QUANT_FILE")->second;
00262         num_db = atoi(parameters[i].find("NUMBER_OF_DB")->second.c_str());
00263 
00264         CSeekDBSetting *dbSetting2 = new CSeekDBSetting(gvar_dir, sinfo_dir,
00265             platform_dir, prep_dir, db_dir, gene_map_file, quant_file, dset_map_file,
00266             num_db);
00267         cc.push_back(dbSetting2);
00268         }
00269     }
00270 
00271     if(!csfinal->Initialize(cc,
00272         //"/tmp/ex_query2.txt", 
00273         sArgs.buffer_arg, !!sArgs.output_text_flag,
00274         bOutputWeightComponent, bSimulateWeight,
00275         CSeekDataset::CORRELATION, //to be overwritten by individual search instance's setting
00276         bSubtractAvg, bNormPlatform, //to be overwritten by individual search instance's settings
00277         bLogit, //always false
00278         sArgs.score_cutoff_arg, 
00279         0.0, //min query fraction (to be overwrriten)
00280         0.0, //min genome fraction (to be overwrriten)
00281         !!sArgs.square_z_flag, //default
00282         false, 1, NULL, useNibble, sArgs.num_threads_arg)) //default
00283     {
00284         fprintf(stderr, "Error occurred!\n");
00285         return -1;
00286     }
00287 
00288     signal(SIGPIPE, SIG_IGN);
00289     //utype i;
00290     for(i=0; i<NUM_THREADS; i++){
00291         THREAD_OCCUPIED[i] = 0;
00292     }
00293 
00294     PORT = sArgs.port_arg;
00295 
00296     int sockfd, new_fd;
00297     struct addrinfo hints, *servinfo, *p;
00298     struct sockaddr_storage their_addr;
00299     socklen_t sin_size;
00300     struct sigaction sa;
00301     char s[INET6_ADDRSTRLEN];
00302     char buf[10];
00303     int rv;
00304     int yes = 1;
00305 
00306     memset(&hints, 0, sizeof(hints));
00307     hints.ai_family=AF_UNSPEC;
00308     hints.ai_socktype = SOCK_STREAM;
00309     hints.ai_flags = AI_PASSIVE;
00310 
00311     if((rv=getaddrinfo(NULL, PORT, &hints, &servinfo))!=0){
00312         fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
00313         return 1;
00314     }
00315 
00316     // loop through all the results and bind to the first we can
00317     for(p = servinfo; p != NULL; p = p->ai_next) {
00318         if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
00319             perror("server: socket");
00320             continue;
00321         }
00322         if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
00323             perror("setsockopt");
00324             exit(1);
00325         }
00326         if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
00327             close(sockfd);
00328             perror("server: bind");
00329             continue;
00330         }
00331         break;
00332     }
00333 
00334     if (p == NULL)  {
00335         fprintf(stderr, "server: failed to bind\n");
00336         return 2;
00337     }
00338 
00339     freeaddrinfo(servinfo); // all done with this structure
00340 
00341     if (listen(sockfd, BACKLOG) == -1) {
00342         perror("listen");
00343         exit(1);
00344     }
00345 
00346     sa.sa_handler = sigchld_handler; // reap all dead processes
00347     sigemptyset(&sa.sa_mask);
00348     sa.sa_flags = SA_RESTART;
00349     if (sigaction(SIGCHLD, &sa, NULL) == -1) {
00350         perror("sigaction");
00351         exit(1);
00352     }
00353 
00354     printf("server: waiting for connections...\n");
00355     struct thread_data thread_arg[NUM_THREADS];
00356     pthread_t th[NUM_THREADS];
00357 
00358     pthread_mutex_init(&mutexGet, NULL);
00359 
00360     while(1){
00361         sin_size = sizeof their_addr;
00362         new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size);
00363         if(new_fd==-1){
00364             perror("accept");
00365             continue;
00366         }
00367         inet_ntop(their_addr.ss_family, get_in_addr((struct sockaddr *)&their_addr), s, sizeof s);
00368         printf("server, got connection from %s\n", s);
00369     
00370         int d = 0;
00371         pthread_mutex_lock(&mutexGet);
00372         for(d=0; d<NUM_THREADS; d++){
00373             if(THREAD_OCCUPIED[d]==0) break;
00374         }
00375 
00376         if(d==NUM_THREADS){
00377             close(new_fd); 
00378             pthread_mutex_unlock(&mutexGet);
00379             continue;
00380         }
00381 
00382         THREAD_OCCUPIED[d] = 1;
00383         pthread_mutex_unlock(&mutexGet);
00384 
00385         //receiving query from client, still need to be completed
00386         //only needs to receive three strings,
00387         //queryFile, searchdatasetFile, outputDir
00388 
00389         string strSearchDataset;
00390         string strQuery;
00391         string strOutputDir;
00392 
00393         //search parameters
00394         string strSearchParameter; 
00395         //format: _ delimited       
00396         //[0]: search_method ("RBP", "OrderStatistics", or "EqualWeighting") 
00397         //[1]: rbp_p
00398         //[2]: minimum fraction of query required
00399         //[3]: minimum fraction of genome required
00400         //[4]: distance measure ("Correlation", "Zscore", or "ZscoreHubbinessCorrected")
00401 
00402         if(CSeekNetwork::Receive(new_fd, strSearchDataset)==-1){
00403             fprintf(stderr, "Error receiving from client!\n");
00404         }
00405 
00406         if(CSeekNetwork::Receive(new_fd, strQuery)==-1){
00407             fprintf(stderr, "Error receiving from client!\n");
00408         }
00409 
00410         if(CSeekNetwork::Receive(new_fd, strOutputDir)==-1){
00411             fprintf(stderr, "Error receiving from client!\n");
00412         }
00413 
00414         if(CSeekNetwork::Receive(new_fd, strSearchParameter)==-1){
00415             fprintf(stderr, "Error receiving from client!\n");
00416         }
00417 
00418         vector<string> searchParameterTokens;
00419         fprintf(stderr, "%s\n", strSearchParameter.c_str());
00420         CMeta::Tokenize(strSearchParameter.c_str(), searchParameterTokens, "_");
00421         thread_arg[d].searchMethod = searchParameterTokens[0];
00422         thread_arg[d].rbp_p = atof(searchParameterTokens[1].c_str());
00423         thread_arg[d].query_fraction_required = 
00424             atof(searchParameterTokens[2].c_str());
00425         thread_arg[d].genome_fraction_required = 
00426             atof(searchParameterTokens[3].c_str());
00427         thread_arg[d].distanceMeasure = searchParameterTokens[4];
00428 
00429         //=========================================================
00430 
00431         thread_arg[d].threadid = d;
00432         thread_arg[d].new_fd = new_fd;
00433         thread_arg[d].strQuery = strQuery;
00434         thread_arg[d].strOutputDir = strOutputDir;
00435         thread_arg[d].strSearchDatasets = strSearchDataset;
00436         //thread_arg[d].csfinal = &csfinal;
00437 
00438         int ret;
00439         pthread_create(&th[d], NULL, do_query, (void*) &thread_arg[d]);
00440 
00441         //int ret;
00442         //pthread_create(&th[d], NULL, do_query, (void *) &thread_arg[d]);
00443         //pthread_join(th[d], (void **)&ret);
00444 
00445     }
00446 
00447 #ifdef WIN32
00448     pthread_win32_process_detach_np( );
00449 #endif // WIN32
00450     return 0; 
00451 
00452 }