Sleipnir
tools/SeekPValue/SeekPValue.cpp
00001 /*****************************************************************************
00002 * This file is provided under the Creative Commons Attribution 3.0 license.
00003 *
00004 * You are free to share, copy, distribute, transmit, or adapt this work
00005 * PROVIDED THAT you attribute the work to the authors listed below.
00006 * For more information, please see the following web page:
00007 * http://creativecommons.org/licenses/by/3.0/
00008 *
00009 * This file is a component of the Sleipnir library for functional genomics,
00010 * authored by:
00011 * Curtis Huttenhower (chuttenh@princeton.edu)
00012 * Mark Schroeder
00013 * Maria D. Chikina
00014 * Olga G. Troyanskaya (ogt@princeton.edu, primary contact)
00015 *
00016 * If you use this library, the included executable tools, or any related
00017 * code in your work, please cite the following publication:
00018 * Curtis Huttenhower, Mark Schroeder, Maria D. Chikina, and
00019 * Olga G. Troyanskaya.
00020 * "The Sleipnir library for computational functional genomics"
00021 *****************************************************************************/
00022 #include "stdafx.h"
00023 #include "cmdline.h"
00024 
00025 #define BACKLOG 10   // how many pending connections queue will hold
00026 char *PORT;
00027 
00028 pthread_mutex_t mutexGet;
00029 
00030 map<string, int> mapstrintGene;
00031 vector<string> vecstrGenes;
00032 vector<string> vecstrGeneID;
00033 vector<vector<int> > randomRank;
00034 vector<vector<float> > randomSc;
00035 vector<int> querySize;
00036 
00037 int numGenes;
00038 
00039 void sigchld_handler(int s){
00040     while(waitpid(-1, NULL, WNOHANG) > 0);
00041 }
00042 
00043 // get sockaddr, IPv4 or IPv6:
00044 void *get_in_addr(struct sockaddr *sa){
00045     if (sa->sa_family == AF_INET) {
00046         return &(((struct sockaddr_in*)sa)->sin_addr);
00047     }
00048     return &(((struct sockaddr_in6*)sa)->sin6_addr);
00049 }
00050 
00051 #define NUM_THREADS 8
00052 char THREAD_OCCUPIED[NUM_THREADS];
00053 
00054 struct thread_data{
00055     vector<string> query;
00056     vector<float> gene_score;
00057 
00058     int mode; //0 - p-value on rank, 1 - p-value on score
00059     float nan;
00060     int threadid;
00061     int new_fd;
00062 };
00063 
00064 void *do_query(void *th_arg){
00065     struct thread_data *my = (struct thread_data *) th_arg;
00066     int new_fd = my->new_fd;
00067     int threadid = my->threadid;
00068     float nan = my->nan;
00069     int mode = my->mode;
00070 
00071     vector<string> queryGenes = my->query;
00072     vector<float> geneScores = my->gene_score;
00073 
00074     size_t i, j, jj, k;
00075     //nan = sArgs.nan_arg;
00076     vector<AResultFloat> sortedGenes;
00077     sortedGenes.resize(geneScores.size());
00078     for(i=0; i<sortedGenes.size(); i++){
00079         sortedGenes[i].i = i;
00080         sortedGenes[i].f = geneScores[i];
00081     }
00082 
00083     vector<int> queryGeneID;
00084     for(i=0; i<queryGenes.size(); i++)
00085         queryGeneID.push_back(mapstrintGene[queryGenes[i]]);
00086     //Query genes themselves have lowest score, to prevent
00087     //them from being counted in PR
00088     for(i=0; i<queryGeneID.size(); i++)
00089         sortedGenes[queryGeneID[i]].f = nan;
00090 
00091     sort(sortedGenes.begin(), sortedGenes.end());
00092 
00093     //comparison
00094     vector<int> geneRank;
00095     geneRank.resize(numGenes);
00096     for(jj=0; jj<numGenes; jj++){
00097         geneRank[sortedGenes[jj].i] = jj;
00098     }
00099 
00100     vector<float> pval;
00101     CSeekTools::InitVector(pval, geneScores.size(), (float) nan);   
00102 
00103     for(jj=0; jj<geneScores.size(); jj++){
00104         int gene = sortedGenes[jj].i;
00105         int gene_rank = jj;
00106         float gene_score = sortedGenes[jj].f;
00107         if(gene_score==nan) break;
00108         //if(gene_score<0) 
00109         //  continue;
00110         vector<int> &rR = randomRank[gene];
00111         vector<float> &rF = randomSc[gene];
00112         int kk = 0;
00113         if(mode==1){
00114             if(gene_score>=0){
00115                 for(kk=0; kk<rF.size(); kk++){
00116                     if(gene_score>=rF[kk] || kk==rF.size()-1)
00117                         pval[gene] = (float) kk / (float) rF.size();
00118                         //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(),
00119                         //gene_rank, kk, gene_score, randomSc[gene][kk]);
00120                     if(gene_score>=rF[kk])
00121                         break;
00122                 }
00123             }else{
00124                 for(kk=rF.size()-1; kk>=0; kk--){
00125                     if(gene_score<=rF[kk] || kk==0)
00126                         pval[gene] = (float) (rF.size()-1-kk) / (float) rF.size();
00127                         //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(),
00128                         //gene_rank, rF.size()-1-kk, gene_score, randomSc[gene][kk]);
00129                     if(gene_score<=rF[kk])
00130                         break;
00131                 }
00132             }
00133         }else if(mode==0){
00134             if(gene_rank<17600/2){
00135                 for(kk=0; kk<rR.size(); kk++){
00136                     if(gene_rank<=rR[kk] || kk==rR.size()-1)
00137                         pval[gene] = (float) kk / (float) rR.size();
00138                         //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(),
00139                         //gene_rank, kk, gene_score, randomSc[gene][kk]);
00140                     if(gene_rank<=rR[kk])
00141                         break;
00142                 }
00143             }else{
00144                 for(kk=rR.size()-1; kk>=0; kk--){
00145                     if(gene_rank>=rR[kk] || kk==0)
00146                         pval[gene] = (float) (rR.size()-1-kk) / (float) rF.size();
00147                         //fprintf(stderr, "%s\t%d\t%d\t%.5e\t%.5e\n", vecstrGenes[gene].c_str(),
00148                         //gene_rank, rR.size()-1-kk, gene_score, randomSc[gene][kk]);
00149                     if(gene_rank>=rR[kk])
00150                         break;
00151                 }
00152             }
00153         }
00154     }
00155 
00156     if(CSeekNetwork::Send(new_fd, pval)==-1){
00157         fprintf(stderr, "Error sending message to client!\n");
00158     }
00159 
00160     pthread_mutex_lock(&mutexGet);
00161     close(new_fd);
00162     THREAD_OCCUPIED[threadid] = 0;
00163     pthread_mutex_unlock(&mutexGet);
00164 
00165     int ret = 0;
00166     pthread_exit((void*)ret);
00167 }
00168 
00169 int main( int iArgs, char** aszArgs ) {
00170     static const size_t c_iBuffer   = 1024;
00171 #ifdef WIN32
00172     pthread_win32_process_attach_np( );
00173 #endif // WIN32
00174     gengetopt_args_info sArgs;
00175 
00176     if( cmdline_parser( iArgs, aszArgs, &sArgs ) ) {
00177         cmdline_parser_print_help( );
00178         return 1; }
00179 
00180     signal(SIGPIPE, SIG_IGN);
00181     size_t i;
00182     for(i=0; i<NUM_THREADS; i++){
00183         THREAD_OCCUPIED[i] = 0;
00184     }
00185 
00186     PORT = sArgs.port_arg;
00187     float nan = sArgs.nan_arg;
00188 
00189     if(!CSeekTools::ReadListTwoColumns(sArgs.input_arg, vecstrGeneID, vecstrGenes))
00190         return false;
00191     for(i=0; i<vecstrGenes.size(); i++)
00192         mapstrintGene[vecstrGenes[i]] = (int) i;
00193 
00194     numGenes = vecstrGenes.size();
00195     
00196     string random_directory = sArgs.random_dir_arg;
00197     int num_random = sArgs.random_num_arg;
00198     int ii, jj;
00199     char ac[256];
00200 
00201     randomRank.resize(numGenes);
00202     randomSc.resize(numGenes);
00203     for(ii=0; ii<numGenes; ii++){
00204         randomRank[ii].resize(num_random);
00205         randomSc[ii].resize(num_random);
00206     }
00207 
00208     for(ii=0; ii<num_random; ii++){
00209         vector<float> randomScores;
00210         sprintf(ac, "%s/%d.gscore", random_directory.c_str(), ii);
00211         CSeekTools::ReadArray(ac, randomScores);
00212         /*vector<string> queryGenes;
00213         sprintf(ac, "%s/%d.query", random_directory.c_str(), ii);
00214         CSeekTools::ReadMultiGeneOneLine(ac, queryGenes);
00215         querySize.push_back(queryGenes.size());
00216         */
00217         vector<AResultFloat> sortedRandom;
00218         sortedRandom.resize(randomScores.size());
00219         for(jj=0; jj<randomScores.size(); jj++){
00220             sortedRandom[jj].i = jj;
00221             sortedRandom[jj].f = randomScores[jj];
00222         }
00223         sort(sortedRandom.begin(), sortedRandom.end());
00224         for(jj=0; jj<randomScores.size(); jj++){
00225             randomRank[sortedRandom[jj].i][ii] = jj;
00226             randomSc[sortedRandom[jj].i][ii] = sortedRandom[jj].f;
00227         }
00228     }
00229 
00230     for(jj=0; jj<numGenes; jj++){
00231         sort(randomRank[jj].begin(), randomRank[jj].end());
00232         sort(randomSc[jj].begin(), randomSc[jj].end(), std::greater<float>());
00233     }
00234 
00235     int sockfd, new_fd;
00236     struct addrinfo hints, *servinfo, *p;
00237     struct sockaddr_storage their_addr;
00238     socklen_t sin_size;
00239     struct sigaction sa;
00240     char s[INET6_ADDRSTRLEN];
00241     char buf[10];
00242     int rv;
00243     int yes = 1;
00244 
00245     memset(&hints, 0, sizeof(hints));
00246     hints.ai_family=AF_UNSPEC;
00247     hints.ai_socktype = SOCK_STREAM;
00248     hints.ai_flags = AI_PASSIVE;
00249 
00250     if((rv=getaddrinfo(NULL, PORT, &hints, &servinfo))!=0){
00251         fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
00252         return 1;
00253     }
00254 
00255     // loop through all the results and bind to the first we can
00256     for(p = servinfo; p != NULL; p = p->ai_next) {
00257         if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
00258             perror("server: socket");
00259             continue;
00260         }
00261         if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
00262             perror("setsockopt");
00263             exit(1);
00264         }
00265         if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
00266             close(sockfd);
00267             perror("server: bind");
00268             continue;
00269         }
00270         break;
00271     }
00272 
00273     if (p == NULL)  {
00274         fprintf(stderr, "server: failed to bind\n");
00275         return 2;
00276     }
00277 
00278     freeaddrinfo(servinfo); // all done with this structure
00279 
00280     if (listen(sockfd, BACKLOG) == -1) {
00281         perror("listen");
00282         exit(1);
00283     }
00284 
00285     sa.sa_handler = sigchld_handler; // reap all dead processes
00286     sigemptyset(&sa.sa_mask);
00287     sa.sa_flags = SA_RESTART;
00288     if (sigaction(SIGCHLD, &sa, NULL) == -1) {
00289         perror("sigaction");
00290         exit(1);
00291     }
00292 
00293     printf("server: waiting for connections...\n");
00294     struct thread_data thread_arg[NUM_THREADS];
00295     pthread_t th[NUM_THREADS];
00296 
00297     pthread_mutex_init(&mutexGet, NULL);
00298 
00299     while(1){
00300         sin_size = sizeof their_addr;
00301         new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size);
00302         if(new_fd==-1){
00303             perror("accept");
00304             continue;
00305         }
00306         inet_ntop(their_addr.ss_family, get_in_addr((struct sockaddr *)&their_addr), s, sizeof s);
00307         printf("server, got connection from %s\n", s);
00308     
00309         int d = 0;
00310         pthread_mutex_lock(&mutexGet);
00311         for(d=0; d<NUM_THREADS; d++){
00312             if(THREAD_OCCUPIED[d]==0) break;
00313         }
00314 
00315         if(d==NUM_THREADS){
00316             close(new_fd); 
00317             pthread_mutex_unlock(&mutexGet);
00318             continue;
00319         }
00320 
00321         THREAD_OCCUPIED[d] = 1;
00322         pthread_mutex_unlock(&mutexGet);
00323 
00324         string strQuery;
00325         vector<float> vf;
00326         vector<string> query;
00327         string strMode;
00328         int mode;
00329 
00330         if(CSeekNetwork::Receive(new_fd, strMode)==-1){
00331             fprintf(stderr, "Error receiving from client\n");
00332         }
00333 
00334         if(strMode=="rank") 
00335             mode = 0;
00336         else if(strMode=="score")
00337             mode = 1;
00338 
00339         if(CSeekNetwork::Receive(new_fd, strQuery)==-1){
00340             fprintf(stderr, "Error receiving from client!\n");
00341         }
00342 
00343         if(CSeekNetwork::Receive(new_fd, vf)==-1){
00344             fprintf(stderr, "Error receiving from client!\n");
00345         }
00346 
00347         CMeta::Tokenize(strQuery.c_str(), query, " ");
00348 
00349         //=========================================================
00350 
00351         thread_arg[d].threadid = d;
00352         thread_arg[d].new_fd = new_fd;
00353         thread_arg[d].query = query;
00354         thread_arg[d].gene_score = vf;
00355         thread_arg[d].nan = nan;
00356         thread_arg[d].mode = mode;
00357         int ret;
00358         pthread_create(&th[d], NULL, do_query, (void*) &thread_arg[d]);
00359 
00360     }
00361 
00362 #ifdef WIN32
00363     pthread_win32_process_detach_np( );
00364 #endif // WIN32
00365     return 0; 
00366 
00367 }