Sleipnir
|
00001 /***************************************************************************** 00002 * This file is provided under the Creative Commons Attribution 3.0 license. 00003 * 00004 * You are free to share, copy, distribute, transmit, or adapt this work 00005 * PROVIDED THAT you attribute the work to the authors listed below. 00006 * For more information, please see the following web page: 00007 * http://creativecommons.org/licenses/by/3.0/ 00008 * 00009 * This file is a component of the Sleipnir library for functional genomics, 00010 * authored by: 00011 * Curtis Huttenhower (chuttenh@princeton.edu) 00012 * Mark Schroeder 00013 * Maria D. Chikina 00014 * Olga G. Troyanskaya (ogt@princeton.edu, primary contact) 00015 * 00016 * If you use this library, the included executable tools, or any related 00017 * code in your work, please cite the following publication: 00018 * Curtis Huttenhower, Mark Schroeder, Maria D. Chikina, and 00019 * Olga G. Troyanskaya. 00020 * "The Sleipnir library for computational functional genomics" 00021 *****************************************************************************/ 00022 #include "stdafx.h" 00023 #include "cmdline.h" 00024 00025 #define BACKLOG 10 // how many pending connections queue will hold 00026 char *PORT; 00027 int NUM_DSET_MEMORY = 50; 00028 CPCL **pcl; 00029 CSeekCentral *csfinal; 00030 char *available; 00031 map<string, int> DNAME_MAP; 00032 map<int, string> DNAME_RMAP; 00033 00034 pthread_mutex_t mutexGet; 00035 00036 void sigchld_handler(int s){ 00037 while(waitpid(-1, NULL, WNOHANG) > 0); 00038 } 00039 00040 // get sockaddr, IPv4 or IPv6: 00041 void *get_in_addr(struct sockaddr *sa){ 00042 if (sa->sa_family == AF_INET) { 00043 return &(((struct sockaddr_in*)sa)->sin_addr); 00044 } 00045 return &(((struct sockaddr_in6*)sa)->sin6_addr); 00046 } 00047 00048 #define NUM_THREADS 8 00049 char THREAD_OCCUPIED[NUM_THREADS]; 00050 00051 struct thread_data{ 00052 //CSeekCentral *csf; 00053 string strQuery; 00054 string strOutputDir; 00055 string strSearchDatasets; 00056 00057 float rbp_p; 00058 float query_fraction_required; 00059 float genome_fraction_required; 00060 string distanceMeasure; //Correlation, Zscore, or ZscoreHubbinessCorrected 00061 string searchMethod; //RBP, EqualWeighting, or OrderStatistics 00062 00063 int threadid; 00064 int new_fd; 00065 }; 00066 00067 void *do_query(void *th_arg){ 00068 struct thread_data *my = (struct thread_data *) th_arg; 00069 int new_fd = my->new_fd; 00070 int threadid = my->threadid; 00071 //CSeekCentral *csf = my->csf; 00072 string strQuery = my->strQuery; 00073 string strOutputDir = my->strOutputDir; 00074 string strSearchDatasets = my->strSearchDatasets; 00075 string distanceMeasure = my->distanceMeasure; 00076 string searchMethod = my->searchMethod; 00077 float rbp_p = my->rbp_p; 00078 float query_fraction_required = my->query_fraction_required; 00079 float genome_fraction_required = my->genome_fraction_required; 00080 00081 CSeekCentral *csu = new CSeekCentral(); 00082 enum CSeekDataset::DistanceMeasure eDM = CSeekDataset::Z_SCORE; 00083 bool bSubtractGeneAvg = false; 00084 bool bNormPlatform = false; 00085 00086 if(distanceMeasure=="Correlation"){ 00087 eDM = CSeekDataset::CORRELATION; 00088 }else if(distanceMeasure=="Zscore"){ 00089 //do nothing 00090 }else if(distanceMeasure=="ZscoreHubbinessCorrected"){ 00091 bSubtractGeneAvg = true; 00092 bNormPlatform = true; 00093 //bNormPlatform = false; 00094 } 00095 00096 //fprintf(stderr, "%s\n%s\n%s\n%.2f\n", strOutputDir.c_str(), strQuery.c_str(), strSearchDatasets.c_str(), query_fraction_required); 00097 00098 bool r = csu->Initialize(strOutputDir, strQuery, strSearchDatasets, csfinal, 00099 new_fd, query_fraction_required, genome_fraction_required, eDM, bSubtractGeneAvg, 00100 bNormPlatform); 00101 00102 //if r is false, then one of the query has no datasets 00103 //containing any of the query (because of CheckDatasets() in Initialize()), 00104 //exit in this case 00105 if(r){ 00106 if(searchMethod=="EqualWeighting"){ 00107 csu->EqualWeightSearch(); 00108 }else if(searchMethod=="OrderStatistics"){ 00109 csu->OrderStatistics(); 00110 }else{ 00111 const gsl_rng_type *T; 00112 gsl_rng *rnd; 00113 gsl_rng_env_setup(); 00114 T = gsl_rng_default; 00115 rnd = gsl_rng_alloc(T); 00116 gsl_rng_set(rnd, 100); 00117 utype FOLD = 5; 00118 //enum PartitionMode PART_M = CUSTOM_PARTITION; 00119 enum CSeekQuery::PartitionMode PART_M = CSeekQuery::LEAVE_ONE_IN; 00120 csu->CVSearch(rnd, PART_M, FOLD, rbp_p); 00121 gsl_rng_free(rnd); 00122 } 00123 } 00124 00125 csu->Destruct(); 00126 delete csu; 00127 00128 //fprintf(stderr, "Done search\n"); system("date +%s%N 1>&2"); 00129 00130 /*char *sm = (char*)malloc(12); 00131 sprintf(sm, "Done search"); 00132 sm[11] = '\0'; 00133 send_msg(new_fd, sm, 12); 00134 free(sm); 00135 */ 00136 //Sending back to client, still need to be completed=============== 00137 //pthread_mutex_lock(&mutexGet); 00138 //THREAD_OCCUPIED[threadid] = 0; 00139 //close(new_fd); 00140 //pthread_mutex_lock(&mutexGet); 00141 00142 pthread_mutex_lock(&mutexGet); 00143 close(new_fd); 00144 THREAD_OCCUPIED[threadid] = 0; 00145 pthread_mutex_unlock(&mutexGet); 00146 00147 int ret = 0; 00148 pthread_exit((void*)ret); 00149 } 00150 00151 int main( int iArgs, char** aszArgs ) { 00152 static const size_t c_iBuffer = 1024; 00153 #ifdef WIN32 00154 pthread_win32_process_attach_np( ); 00155 #endif // WIN32 00156 gengetopt_args_info sArgs; 00157 int lineSize = 1024; 00158 00159 if( cmdline_parser( iArgs, aszArgs, &sArgs ) ) { 00160 cmdline_parser_print_help( ); 00161 return 1; } 00162 00163 bool useNibble = false; 00164 00165 if(sArgs.is_nibble_flag==1){ 00166 fprintf(stderr, "Nibble integration is not supported! Please use a non-nibble CDatabase.\n"); 00167 useNibble = true; 00168 return 1; 00169 } 00170 00171 // Random Number Generator Initializations 00172 utype i, j; 00173 bool bOutputWeightComponent = true; 00174 bool bSimulateWeight = true; 00175 bool bSubtractAvg = false; 00176 bool bNormPlatform = false; 00177 bool bLogit = false; 00178 00179 csfinal = new CSeekCentral(); 00180 CSeekDBSetting *dbSetting = new CSeekDBSetting(sArgs.dir_gvar_arg, 00181 sArgs.dir_sinfo_arg, sArgs.dir_platform_arg, sArgs.dir_prep_in_arg, 00182 sArgs.dir_in_arg, sArgs.input_arg, sArgs.quant_arg, sArgs.dset_arg, 00183 sArgs.num_db_arg); 00184 vector<CSeekDBSetting*> cc; 00185 cc.push_back(dbSetting); 00186 00187 string add_db = sArgs.additional_db_arg; 00188 if(add_db!="NA"){ 00189 ifstream ifsm; 00190 ifsm.open(add_db.c_str()); 00191 if(!ifsm.is_open()){ 00192 fprintf(stderr, "Error opening file %s\n", add_db.c_str()); 00193 return false; 00194 } 00195 char acBuffer[lineSize]; 00196 utype c_iBuffer = lineSize; 00197 vector<map<string,string> > parameters; //an array of CDatabase's 00198 while(!ifsm.eof()){ 00199 ifsm.getline(acBuffer, c_iBuffer-1); 00200 if(acBuffer[0]==0) break; 00201 acBuffer[c_iBuffer-1]=0; 00202 string strB = acBuffer; 00203 if(strB=="START"){ 00204 map<string,string> p; 00205 while(!ifsm.eof()){ 00206 ifsm.getline(acBuffer, c_iBuffer-1); 00207 if(acBuffer[0]==0){ 00208 fprintf(stderr, "Invalid line (empty)\n"); 00209 return 1; 00210 } 00211 strB = acBuffer; 00212 if(strB=="END") break; 00213 vector<string> tok; 00214 CMeta::Tokenize(acBuffer, tok); //separator is tab 00215 p[tok[0]] = tok[1]; 00216 } 00217 parameters.push_back(p); 00218 } 00219 } 00220 ifsm.close(); 00221 if(parameters.size()==0){ 00222 fprintf(stderr, "Error, extra_db setting file must begin with START and end with END lines\n"); 00223 return 1; 00224 } 00225 00226 for(i=0; i<parameters.size(); i++){ 00227 string sinfo_dir = "NA"; 00228 string gvar_dir = "NA"; 00229 string platform_dir = "NA"; 00230 string prep_dir = "NA"; 00231 string db_dir = "NA"; 00232 string dset_map_file = "NA"; 00233 string gene_map_file = "NA"; 00234 string quant_file = "NA"; 00235 int num_db = -1; 00236 00237 if(parameters[i].find("SINFO_DIR")->second=="NA"){ 00238 fprintf(stderr, "Please specify an sinfo directory for the extra db\n"); 00239 return false; 00240 } 00241 sinfo_dir = parameters[i].find("SINFO_DIR")->second; 00242 if(parameters[i].find("GVAR_DIR")!=parameters[i].end()) 00243 gvar_dir = parameters[i].find("GVAR_DIR")->second; 00244 if(parameters[i].find("PREP_DIR")==parameters[i].end() || 00245 parameters[i].find("PLATFORM_DIR")==parameters[i].end() || 00246 parameters[i].find("DB_DIR")==parameters[i].end() || 00247 parameters[i].find("DSET_MAP_FILE")==parameters[i].end() || 00248 parameters[i].find("GENE_MAP_FILE")==parameters[i].end() || 00249 parameters[i].find("QUANT_FILE")==parameters[i].end() || 00250 parameters[i].find("NUMBER_OF_DB")==parameters[i].end()){ 00251 fprintf(stderr, "Some arguments are missing. Please make sure the following are provided:\n"); 00252 fprintf(stderr, "PREP_DIR, DB_DIR, DSET_MAP_FILE, GENE_MAP_FILE, QUANT_FILE, NUMBER_OF_DB\n"); 00253 return false; 00254 } 00255 00256 platform_dir = parameters[i].find("PLATFORM_DIR")->second; 00257 db_dir = parameters[i].find("DB_DIR")->second; 00258 prep_dir = parameters[i].find("PREP_DIR")->second; 00259 dset_map_file = parameters[i].find("DSET_MAP_FILE")->second; 00260 gene_map_file = parameters[i].find("GENE_MAP_FILE")->second; 00261 quant_file = parameters[i].find("QUANT_FILE")->second; 00262 num_db = atoi(parameters[i].find("NUMBER_OF_DB")->second.c_str()); 00263 00264 CSeekDBSetting *dbSetting2 = new CSeekDBSetting(gvar_dir, sinfo_dir, 00265 platform_dir, prep_dir, db_dir, gene_map_file, quant_file, dset_map_file, 00266 num_db); 00267 cc.push_back(dbSetting2); 00268 } 00269 } 00270 00271 if(!csfinal->Initialize(cc, 00272 //"/tmp/ex_query2.txt", 00273 sArgs.buffer_arg, !!sArgs.output_text_flag, 00274 bOutputWeightComponent, bSimulateWeight, 00275 CSeekDataset::CORRELATION, //to be overwritten by individual search instance's setting 00276 bSubtractAvg, bNormPlatform, //to be overwritten by individual search instance's settings 00277 bLogit, //always false 00278 sArgs.score_cutoff_arg, 00279 0.0, //min query fraction (to be overwrriten) 00280 0.0, //min genome fraction (to be overwrriten) 00281 !!sArgs.square_z_flag, //default 00282 false, 1, NULL, useNibble, sArgs.num_threads_arg)) //default 00283 { 00284 fprintf(stderr, "Error occurred!\n"); 00285 return -1; 00286 } 00287 00288 signal(SIGPIPE, SIG_IGN); 00289 //utype i; 00290 for(i=0; i<NUM_THREADS; i++){ 00291 THREAD_OCCUPIED[i] = 0; 00292 } 00293 00294 PORT = sArgs.port_arg; 00295 00296 int sockfd, new_fd; 00297 struct addrinfo hints, *servinfo, *p; 00298 struct sockaddr_storage their_addr; 00299 socklen_t sin_size; 00300 struct sigaction sa; 00301 char s[INET6_ADDRSTRLEN]; 00302 char buf[10]; 00303 int rv; 00304 int yes = 1; 00305 00306 memset(&hints, 0, sizeof(hints)); 00307 hints.ai_family=AF_UNSPEC; 00308 hints.ai_socktype = SOCK_STREAM; 00309 hints.ai_flags = AI_PASSIVE; 00310 00311 if((rv=getaddrinfo(NULL, PORT, &hints, &servinfo))!=0){ 00312 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); 00313 return 1; 00314 } 00315 00316 // loop through all the results and bind to the first we can 00317 for(p = servinfo; p != NULL; p = p->ai_next) { 00318 if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { 00319 perror("server: socket"); 00320 continue; 00321 } 00322 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { 00323 perror("setsockopt"); 00324 exit(1); 00325 } 00326 if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { 00327 close(sockfd); 00328 perror("server: bind"); 00329 continue; 00330 } 00331 break; 00332 } 00333 00334 if (p == NULL) { 00335 fprintf(stderr, "server: failed to bind\n"); 00336 return 2; 00337 } 00338 00339 freeaddrinfo(servinfo); // all done with this structure 00340 00341 if (listen(sockfd, BACKLOG) == -1) { 00342 perror("listen"); 00343 exit(1); 00344 } 00345 00346 sa.sa_handler = sigchld_handler; // reap all dead processes 00347 sigemptyset(&sa.sa_mask); 00348 sa.sa_flags = SA_RESTART; 00349 if (sigaction(SIGCHLD, &sa, NULL) == -1) { 00350 perror("sigaction"); 00351 exit(1); 00352 } 00353 00354 printf("server: waiting for connections...\n"); 00355 struct thread_data thread_arg[NUM_THREADS]; 00356 pthread_t th[NUM_THREADS]; 00357 00358 pthread_mutex_init(&mutexGet, NULL); 00359 00360 while(1){ 00361 sin_size = sizeof their_addr; 00362 new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size); 00363 if(new_fd==-1){ 00364 perror("accept"); 00365 continue; 00366 } 00367 inet_ntop(their_addr.ss_family, get_in_addr((struct sockaddr *)&their_addr), s, sizeof s); 00368 printf("server, got connection from %s\n", s); 00369 00370 int d = 0; 00371 pthread_mutex_lock(&mutexGet); 00372 for(d=0; d<NUM_THREADS; d++){ 00373 if(THREAD_OCCUPIED[d]==0) break; 00374 } 00375 00376 if(d==NUM_THREADS){ 00377 close(new_fd); 00378 pthread_mutex_unlock(&mutexGet); 00379 continue; 00380 } 00381 00382 THREAD_OCCUPIED[d] = 1; 00383 pthread_mutex_unlock(&mutexGet); 00384 00385 //receiving query from client, still need to be completed 00386 //only needs to receive three strings, 00387 //queryFile, searchdatasetFile, outputDir 00388 00389 string strSearchDataset; 00390 string strQuery; 00391 string strOutputDir; 00392 00393 //search parameters 00394 string strSearchParameter; 00395 //format: _ delimited 00396 //[0]: search_method ("RBP", "OrderStatistics", or "EqualWeighting") 00397 //[1]: rbp_p 00398 //[2]: minimum fraction of query required 00399 //[3]: minimum fraction of genome required 00400 //[4]: distance measure ("Correlation", "Zscore", or "ZscoreHubbinessCorrected") 00401 00402 if(CSeekNetwork::Receive(new_fd, strSearchDataset)==-1){ 00403 fprintf(stderr, "Error receiving from client!\n"); 00404 } 00405 00406 if(CSeekNetwork::Receive(new_fd, strQuery)==-1){ 00407 fprintf(stderr, "Error receiving from client!\n"); 00408 } 00409 00410 if(CSeekNetwork::Receive(new_fd, strOutputDir)==-1){ 00411 fprintf(stderr, "Error receiving from client!\n"); 00412 } 00413 00414 if(CSeekNetwork::Receive(new_fd, strSearchParameter)==-1){ 00415 fprintf(stderr, "Error receiving from client!\n"); 00416 } 00417 00418 vector<string> searchParameterTokens; 00419 fprintf(stderr, "%s\n", strSearchParameter.c_str()); 00420 CMeta::Tokenize(strSearchParameter.c_str(), searchParameterTokens, "_"); 00421 thread_arg[d].searchMethod = searchParameterTokens[0]; 00422 thread_arg[d].rbp_p = atof(searchParameterTokens[1].c_str()); 00423 thread_arg[d].query_fraction_required = 00424 atof(searchParameterTokens[2].c_str()); 00425 thread_arg[d].genome_fraction_required = 00426 atof(searchParameterTokens[3].c_str()); 00427 thread_arg[d].distanceMeasure = searchParameterTokens[4]; 00428 00429 //========================================================= 00430 00431 thread_arg[d].threadid = d; 00432 thread_arg[d].new_fd = new_fd; 00433 thread_arg[d].strQuery = strQuery; 00434 thread_arg[d].strOutputDir = strOutputDir; 00435 thread_arg[d].strSearchDatasets = strSearchDataset; 00436 //thread_arg[d].csfinal = &csfinal; 00437 00438 int ret; 00439 pthread_create(&th[d], NULL, do_query, (void*) &thread_arg[d]); 00440 00441 //int ret; 00442 //pthread_create(&th[d], NULL, do_query, (void *) &thread_arg[d]); 00443 //pthread_join(th[d], (void **)&ret); 00444 00445 } 00446 00447 #ifdef WIN32 00448 pthread_win32_process_detach_np( ); 00449 #endif // WIN32 00450 return 0; 00451 00452 }