Sleipnir
|
00001 /***************************************************************************** 00002 * This file is provided under the Creative Commons Attribution 3.0 license. 00003 * 00004 * You are free to share, copy, distribute, transmit, or adapt this work 00005 * PROVIDED THAT you attribute the work to the authors listed below. 00006 * For more information, please see the following web page: 00007 * http://creativecommons.org/licenses/by/3.0/ 00008 * 00009 * This file is a component of the Sleipnir library for functional genomics, 00010 * authored by: 00011 * Curtis Huttenhower (chuttenh@princeton.edu) 00012 * Mark Schroeder 00013 * Maria D. Chikina 00014 * Olga G. Troyanskaya (ogt@princeton.edu, primary contact) 00015 * 00016 * If you use this library, the included executable tools, or any related 00017 * code in your work, please cite the following publication: 00018 * Curtis Huttenhower, Mark Schroeder, Maria D. Chikina, and 00019 * Olga G. Troyanskaya. 00020 * "The Sleipnir library for computational functional genomics" 00021 *****************************************************************************/ 00022 #include "stdafx.h" 00023 #include "cmdline.h" 00024 00025 #define BACKLOG 10 // how many pending connections queue will hold 00026 char *PORT; 00027 00028 pthread_mutex_t mutexGet; 00029 00030 map<string, int> mapstrintGene; 00031 vector<string> vecstrGenes; 00032 vector<string> vecstrGeneID; 00033 vector<vector<int> > randomRank; 00034 vector<vector<float> > randomSc; 00035 vector<int> querySize; 00036 00037 int numGenes; 00038 00039 void sigchld_handler(int s){ 00040 while(waitpid(-1, NULL, WNOHANG) > 0); 00041 } 00042 00043 // get sockaddr, IPv4 or IPv6: 00044 void *get_in_addr(struct sockaddr *sa){ 00045 if (sa->sa_family == AF_INET) { 00046 return &(((struct sockaddr_in*)sa)->sin_addr); 00047 } 00048 return &(((struct sockaddr_in6*)sa)->sin6_addr); 00049 } 00050 00051 #define NUM_THREADS 8 00052 char THREAD_OCCUPIED[NUM_THREADS]; 00053 00054 struct thread_data{ 00055 vector<string> query; 00056 vector<float> gene_score; 00057 00058 int mode; //0 - p-value on rank, 1 - p-value on score 00059 float nan; 00060 int threadid; 00061 int new_fd; 00062 }; 00063 00064 void *do_query(void *th_arg){ 00065 struct thread_data *my = (struct thread_data *) th_arg; 00066 int new_fd = my->new_fd; 00067 int threadid = my->threadid; 00068 float nan = my->nan; 00069 int mode = my->mode; 00070 00071 vector<string> queryGenes = my->query; 00072 vector<float> geneScores = my->gene_score; 00073 00074 size_t i, j, jj, k; 00075 //nan = sArgs.nan_arg; 00076 vector<AResultFloat> sortedGenes; 00077 sortedGenes.resize(geneScores.size()); 00078 for(i=0; i<sortedGenes.size(); i++){ 00079 sortedGenes[i].i = i; 00080 sortedGenes[i].f = geneScores[i]; 00081 } 00082 00083 vector<int> queryGeneID; 00084 for(i=0; i<queryGenes.size(); i++) 00085 queryGeneID.push_back(mapstrintGene[queryGenes[i]]); 00086 //Query genes themselves have lowest score, to prevent 00087 //them from being counted in PR 00088 for(i=0; i<queryGeneID.size(); i++) 00089 sortedGenes[queryGeneID[i]].f = nan; 00090 00091 sort(sortedGenes.begin(), sortedGenes.end()); 00092 00093 //comparison 00094 vector<int> geneRank; 00095 geneRank.resize(numGenes); 00096 for(jj=0; jj<numGenes; jj++){ 00097 geneRank[sortedGenes[jj].i] = jj; 00098 } 00099 00100 vector<float> pval; 00101 CSeekTools::InitVector(pval, geneScores.size(), (float) nan); 00102 00103 for(jj=0; jj<geneScores.size(); jj++){ 00104 int gene = sortedGenes[jj].i; 00105 int gene_rank = jj; 00106 float gene_score = sortedGenes[jj].f; 00107 if(gene_score==nan) break; 00108 //if(gene_score<0) 00109 // continue; 00110 vector<int> &rR = randomRank[gene]; 00111 vector<float> &rF = randomSc[gene]; 00112 int kk = 0; 00113 if(mode==1){ 00114 if(gene_score>=0){ 00115 for(kk=0; kk<rF.size(); kk++){ 00116 if(gene_score>=rF[kk] || kk==rF.size()-1) 00117 pval[gene] = (float) kk / (float) rF.size(); 00118 //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(), 00119 //gene_rank, kk, gene_score, randomSc[gene][kk]); 00120 if(gene_score>=rF[kk]) 00121 break; 00122 } 00123 }else{ 00124 for(kk=rF.size()-1; kk>=0; kk--){ 00125 if(gene_score<=rF[kk] || kk==0) 00126 pval[gene] = (float) (rF.size()-1-kk) / (float) rF.size(); 00127 //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(), 00128 //gene_rank, rF.size()-1-kk, gene_score, randomSc[gene][kk]); 00129 if(gene_score<=rF[kk]) 00130 break; 00131 } 00132 } 00133 }else if(mode==0){ 00134 if(gene_rank<17600/2){ 00135 for(kk=0; kk<rR.size(); kk++){ 00136 if(gene_rank<=rR[kk] || kk==rR.size()-1) 00137 pval[gene] = (float) kk / (float) rR.size(); 00138 //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(), 00139 //gene_rank, kk, gene_score, randomSc[gene][kk]); 00140 if(gene_rank<=rR[kk]) 00141 break; 00142 } 00143 }else{ 00144 for(kk=rR.size()-1; kk>=0; kk--){ 00145 if(gene_rank>=rR[kk] || kk==0) 00146 pval[gene] = (float) (rR.size()-1-kk) / (float) rF.size(); 00147 //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(), 00148 //gene_rank, rR.size()-1-kk, gene_score, randomSc[gene][kk]); 00149 if(gene_rank>=rR[kk]) 00150 break; 00151 } 00152 } 00153 } 00154 } 00155 00156 if(CSeekNetwork::Send(new_fd, pval)==-1){ 00157 fprintf(stderr, "Error sending message to client!\n"); 00158 } 00159 00160 pthread_mutex_lock(&mutexGet); 00161 close(new_fd); 00162 THREAD_OCCUPIED[threadid] = 0; 00163 pthread_mutex_unlock(&mutexGet); 00164 00165 int ret = 0; 00166 pthread_exit((void*)ret); 00167 } 00168 00169 int main( int iArgs, char** aszArgs ) { 00170 static const size_t c_iBuffer = 1024; 00171 #ifdef WIN32 00172 pthread_win32_process_attach_np( ); 00173 #endif // WIN32 00174 gengetopt_args_info sArgs; 00175 00176 if( cmdline_parser( iArgs, aszArgs, &sArgs ) ) { 00177 cmdline_parser_print_help( ); 00178 return 1; } 00179 00180 signal(SIGPIPE, SIG_IGN); 00181 size_t i; 00182 for(i=0; i<NUM_THREADS; i++){ 00183 THREAD_OCCUPIED[i] = 0; 00184 } 00185 00186 PORT = sArgs.port_arg; 00187 float nan = sArgs.nan_arg; 00188 00189 if(!CSeekTools::ReadListTwoColumns(sArgs.input_arg, vecstrGeneID, vecstrGenes)) 00190 return false; 00191 for(i=0; i<vecstrGenes.size(); i++) 00192 mapstrintGene[vecstrGenes[i]] = (int) i; 00193 00194 numGenes = vecstrGenes.size(); 00195 00196 string random_directory = sArgs.random_dir_arg; 00197 int num_random = sArgs.random_num_arg; 00198 int ii, jj; 00199 char ac[256]; 00200 00201 randomRank.resize(numGenes); 00202 randomSc.resize(numGenes); 00203 for(ii=0; ii<numGenes; ii++){ 00204 randomRank[ii].resize(num_random); 00205 randomSc[ii].resize(num_random); 00206 } 00207 00208 for(ii=0; ii<num_random; ii++){ 00209 vector<float> randomScores; 00210 sprintf(ac, "%s/%d.gscore", random_directory.c_str(), ii); 00211 CSeekTools::ReadArray(ac, randomScores); 00212 /*vector<string> queryGenes; 00213 sprintf(ac, "%s/%d.query", random_directory.c_str(), ii); 00214 CSeekTools::ReadMultiGeneOneLine(ac, queryGenes); 00215 querySize.push_back(queryGenes.size()); 00216 */ 00217 vector<AResultFloat> sortedRandom; 00218 sortedRandom.resize(randomScores.size()); 00219 for(jj=0; jj<randomScores.size(); jj++){ 00220 sortedRandom[jj].i = jj; 00221 sortedRandom[jj].f = randomScores[jj]; 00222 } 00223 sort(sortedRandom.begin(), sortedRandom.end()); 00224 for(jj=0; jj<randomScores.size(); jj++){ 00225 randomRank[sortedRandom[jj].i][ii] = jj; 00226 randomSc[sortedRandom[jj].i][ii] = sortedRandom[jj].f; 00227 } 00228 } 00229 00230 for(jj=0; jj<numGenes; jj++){ 00231 sort(randomRank[jj].begin(), randomRank[jj].end()); 00232 sort(randomSc[jj].begin(), randomSc[jj].end(), std::greater<float>()); 00233 } 00234 00235 int sockfd, new_fd; 00236 struct addrinfo hints, *servinfo, *p; 00237 struct sockaddr_storage their_addr; 00238 socklen_t sin_size; 00239 struct sigaction sa; 00240 char s[INET6_ADDRSTRLEN]; 00241 char buf[10]; 00242 int rv; 00243 int yes = 1; 00244 00245 memset(&hints, 0, sizeof(hints)); 00246 hints.ai_family=AF_UNSPEC; 00247 hints.ai_socktype = SOCK_STREAM; 00248 hints.ai_flags = AI_PASSIVE; 00249 00250 if((rv=getaddrinfo(NULL, PORT, &hints, &servinfo))!=0){ 00251 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); 00252 return 1; 00253 } 00254 00255 // loop through all the results and bind to the first we can 00256 for(p = servinfo; p != NULL; p = p->ai_next) { 00257 if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { 00258 perror("server: socket"); 00259 continue; 00260 } 00261 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { 00262 perror("setsockopt"); 00263 exit(1); 00264 } 00265 if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { 00266 close(sockfd); 00267 perror("server: bind"); 00268 continue; 00269 } 00270 break; 00271 } 00272 00273 if (p == NULL) { 00274 fprintf(stderr, "server: failed to bind\n"); 00275 return 2; 00276 } 00277 00278 freeaddrinfo(servinfo); // all done with this structure 00279 00280 if (listen(sockfd, BACKLOG) == -1) { 00281 perror("listen"); 00282 exit(1); 00283 } 00284 00285 sa.sa_handler = sigchld_handler; // reap all dead processes 00286 sigemptyset(&sa.sa_mask); 00287 sa.sa_flags = SA_RESTART; 00288 if (sigaction(SIGCHLD, &sa, NULL) == -1) { 00289 perror("sigaction"); 00290 exit(1); 00291 } 00292 00293 printf("server: waiting for connections...\n"); 00294 struct thread_data thread_arg[NUM_THREADS]; 00295 pthread_t th[NUM_THREADS]; 00296 00297 pthread_mutex_init(&mutexGet, NULL); 00298 00299 while(1){ 00300 sin_size = sizeof their_addr; 00301 new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size); 00302 if(new_fd==-1){ 00303 perror("accept"); 00304 continue; 00305 } 00306 inet_ntop(their_addr.ss_family, get_in_addr((struct sockaddr *)&their_addr), s, sizeof s); 00307 printf("server, got connection from %s\n", s); 00308 00309 int d = 0; 00310 pthread_mutex_lock(&mutexGet); 00311 for(d=0; d<NUM_THREADS; d++){ 00312 if(THREAD_OCCUPIED[d]==0) break; 00313 } 00314 00315 if(d==NUM_THREADS){ 00316 close(new_fd); 00317 pthread_mutex_unlock(&mutexGet); 00318 continue; 00319 } 00320 00321 THREAD_OCCUPIED[d] = 1; 00322 pthread_mutex_unlock(&mutexGet); 00323 00324 string strQuery; 00325 vector<float> vf; 00326 vector<string> query; 00327 string strMode; 00328 int mode; 00329 00330 if(CSeekNetwork::Receive(new_fd, strMode)==-1){ 00331 fprintf(stderr, "Error receiving from client\n"); 00332 } 00333 00334 if(strMode=="rank") 00335 mode = 0; 00336 else if(strMode=="score") 00337 mode = 1; 00338 00339 if(CSeekNetwork::Receive(new_fd, strQuery)==-1){ 00340 fprintf(stderr, "Error receiving from client!\n"); 00341 } 00342 00343 if(CSeekNetwork::Receive(new_fd, vf)==-1){ 00344 fprintf(stderr, "Error receiving from client!\n"); 00345 } 00346 00347 CMeta::Tokenize(strQuery.c_str(), query, " "); 00348 00349 //========================================================= 00350 00351 thread_arg[d].threadid = d; 00352 thread_arg[d].new_fd = new_fd; 00353 thread_arg[d].query = query; 00354 thread_arg[d].gene_score = vf; 00355 thread_arg[d].nan = nan; 00356 thread_arg[d].mode = mode; 00357 int ret; 00358 pthread_create(&th[d], NULL, do_query, (void*) &thread_arg[d]); 00359 00360 } 00361 00362 #ifdef WIN32 00363 pthread_win32_process_detach_np( ); 00364 #endif // WIN32 00365 return 0; 00366 00367 }