Sleipnir
tools/PCLServer/PCLServer.cpp
00001 /*****************************************************************************
00002 * This file is provided under the Creative Commons Attribution 3.0 license.
00003 *
00004 * You are free to share, copy, distribute, transmit, or adapt this work
00005 * PROVIDED THAT you attribute the work to the authors listed below.
00006 * For more information, please see the following web page:
00007 * http://creativecommons.org/licenses/by/3.0/
00008 *
00009 * This file is a component of the Sleipnir library for functional genomics,
00010 * authored by:
00011 * Curtis Huttenhower (chuttenh@princeton.edu)
00012 * Mark Schroeder
00013 * Maria D. Chikina
00014 * Olga G. Troyanskaya (ogt@princeton.edu, primary contact)
00015 *
00016 * If you use this library, the included executable tools, or any related
00017 * code in your work, please cite the following publication:
00018 * Curtis Huttenhower, Mark Schroeder, Maria D. Chikina, and
00019 * Olga G. Troyanskaya.
00020 * "The Sleipnir library for computational functional genomics"
00021 *****************************************************************************/
00022 #include "stdafx.h"
00023 #include "cmdline.h"
00024 #include <sys/socket.h>
00025 #include <sys/types.h>
00026 #include <netinet/in.h>
00027 #include <netdb.h>
00028 #include <arpa/inet.h>
00029 #include <sys/wait.h>
00030 #include <signal.h>
00031 #include <list>
00032 
00033 #define BACKLOG 10   // how many pending connections queue will hold
00034 char *PORT;
00035 int NUM_DSET_MEMORY = 100;
00036 string pcl_input_dir;
00037 
00038 CPCL **pcl;
00039 list<int> available;
00040 char *loaded;
00041 map<string, int> DNAME_MAP;
00042 map<int, int> return_val;
00043 map<int, string> DNAME_RMAP;
00044 
00045 pthread_mutex_t mutexGet;
00046 
00047 vector<CSeekDBSetting*> cc;
00048 
00049 /*string strPrepInputDirectory;
00050 string strSinfoInputDirectory;
00051 string strDatasetInputDirectory;
00052 string strPlatformInputDirectory;*/
00053 map<string, int> mapstrintGene;
00054 
00055 vector<string> vecstrGeneID;
00056 vector<string> vecstrGenes;
00057 vector<string> vecstrDatasets;
00058 vector<string> vecstrDP;
00059 
00060 map<string, int> mapstrintDatasetDB;
00061 map<string, utype> mapstriPlatform;
00062 map<string, string> mapstrstrDatasetPlatform;
00063 map<string, utype> mapstrintDataset;
00064 vector<string> vecstrPlatforms;
00065 vector<CSeekPlatform> vp;
00066 
00067 
00068 void sigchld_handler(int s){
00069     while(waitpid(-1, NULL, WNOHANG) > 0);
00070 }
00071 
00072 // get sockaddr, IPv4 or IPv6:
00073 void *get_in_addr(struct sockaddr *sa){
00074     if (sa->sa_family == AF_INET) {
00075         return &(((struct sockaddr_in*)sa)->sin_addr);
00076     }
00077     return &(((struct sockaddr_in6*)sa)->sin6_addr);
00078 }
00079 
00080 #define NUM_THREADS 8
00081 char THREAD_OCCUPIED[NUM_THREADS];
00082 
00083 int send_msg(int new_fd, char *c, int size){
00084     int r = send(new_fd, c, size, 0);
00085     if(r==-1){
00086         printf("client exists");
00087     }
00088     return r;
00089 }
00090 
00091 void cl(char *b, int size){
00092     int i = 0;
00093     for(i=0; i<size; i++){
00094         b[i] = '\0';
00095     }
00096 }
00097 
00098 struct thread_data{
00099     vector<string> datasetName;
00100     vector<string> geneName;
00101     vector<string> queryName;
00102     int threadid;
00103     int new_fd;
00104     float rbp_p;
00105     bool outputNormalized;
00106     bool outputCoexpression;
00107     bool outputQueryCoexpression;
00108     bool outputExpression;
00109     bool outputQueryExpression;
00110 };
00111 
00112 int cpy(char *d, char *s, int beg, int num){
00113     int i;
00114     for(i=0; i<num; i++){
00115         d[beg+i] = s[i];
00116     }
00117     return beg+num;
00118 }
00119 
00120 int GetOpenSlot(){
00121     int i = -1;
00122     int size = 0;
00123     while(size<available.size()){
00124         i = available.front();
00125         available.pop_front();
00126         available.push_back(i);
00127         if(loaded[i]==0) break;
00128         size++;
00129     }
00130     if(size==available.size()){
00131         return -1;
00132     }
00133     return i;
00134 }
00135 
00136 void *do_query(void *th_arg){
00137     struct thread_data *my = (struct thread_data *) th_arg;
00138     vector<string> datasetName = my->datasetName;
00139     vector<string> geneName = my->geneName;
00140     vector<string> queryName = my->queryName;
00141     bool outputNormalized = my->outputNormalized;
00142     bool outputCoexpression = my->outputCoexpression;
00143     bool outputQueryCoexpression = my->outputQueryCoexpression;
00144     bool outputExpression = my->outputExpression;
00145     bool outputQueryExpression = my->outputQueryExpression;
00146     int new_fd = my->new_fd;
00147     int tid = my->threadid;
00148     float RBP_P = my->rbp_p;
00149 
00150     /*
00151     cl(buf, 5000);
00152     sprintf(buf, "Begin...\n");
00153     if(send_msg(new_fd, buf, 5000)==-1){
00154         THREAD_OCCUPIED[tid] = 0;
00155         pthread_exit(0);
00156     }*/
00157     pthread_mutex_lock(&mutexGet);
00158 
00159     size_t i;
00160 
00161     vector<string>::const_iterator iterS = datasetName.begin();
00162     vector<CPCL*> vc;
00163 
00164     for(i=0; i<NUM_DSET_MEMORY; i++){
00165         loaded[i] = 0;
00166     }
00167 
00168     fprintf(stderr, "start processing...\n");
00169     for(; iterS!=datasetName.end(); iterS++){
00170         int n = -1;
00171         map<string, int>::const_iterator iterM = DNAME_MAP.find(*iterS);
00172         if(iterM!=DNAME_MAP.end()){
00173             n = iterM->second;
00174             fprintf(stderr, "found %d for dataset %s...\n", n, iterS->c_str());
00175             vc.push_back(pcl[n]);
00176             loaded[n] = 1;
00177             continue;
00178         }
00179         //pthread_mutex_lock(&mutexGet);
00180         n = GetOpenSlot();
00181         map<int, string>::const_iterator iterRM = DNAME_RMAP.find(n);
00182         if(iterRM!=DNAME_RMAP.end()){
00183             DNAME_MAP.erase(iterRM->second);
00184             DNAME_RMAP.erase(n);
00185         }
00186         DNAME_MAP[*iterS] = n;
00187         DNAME_RMAP[n] = *iterS;
00188         loaded[n] = 1;
00189         //pthread_mutex_unlock(&mutexGet);
00190 
00191         fprintf(stderr, "acquired %d for dataset %s...\n", n, iterS->c_str());
00192         //pcl[n]->Reset();
00193         //string strFileStem = (*iterS).substr(0, (*iterS).find(".bin"));
00194         fprintf(stderr, "dataset reset \n");
00195         string pcl_path = pcl_input_dir + "/" + *iterS;
00196         pcl[n]->Open(pcl_path.c_str());
00197         //pcl[n]->Open(strFileStem.c_str(), 2, false, false);
00198         fprintf(stderr, "dataset opened\n");
00199         vc.push_back(pcl[n]);
00200     }
00201 
00202     //vector<CFullMatrix<float> *> vC; //gene expression
00203     //vector<CFullMatrix<float> *> vQ; //query expression (if enabled) (EXTRA)
00204     //int totNumExperiments = 0;
00205     int genes = geneName.size();
00206     int queries = queryName.size(); //(EXTRA)
00207     int datasets = datasetName.size();
00208 
00209     vector<float> vecG, vecQ, vecCoexpression, vecqCoexpression;
00210     vector<float> sizeD;
00211     sizeD.resize(vc.size());
00212 
00213     vector< vector<float> > d_vecG, d_vecQ, d_vecCoexpression, d_vecqCoexpression;
00214     d_vecG.resize(vc.size());
00215     d_vecQ.resize(vc.size());
00216     d_vecCoexpression.resize(vc.size());
00217     d_vecqCoexpression.resize(vc.size());
00218 
00219     //Qian added
00220     //vector<CSeekDataset *> vd; //(EXTRA)
00221     //vd.resize(vc.size()); //(EXTRA)
00222     //CFullMatrix<float> *vCoexpression = new CFullMatrix<float>(); //(EXTRA)
00223     //vCoexpression->Initialize(genes, datasets); //(EXTRA)
00224 
00225     fprintf(stderr, "Reading data...\n");
00226 
00227     float NaN = -9999;  
00228     //for each dataset
00229 
00230     vector<float> quant;
00231 
00232     //QUANT file must be consistent
00233     CSeekTools::ReadQuantFile(cc[0]->GetValue("quant"), quant);
00234     //CSeekTools::ReadQuantFile("/home/qzhu/Seek/quant2", quant);
00235 
00236     #pragma omp parallel for \
00237     private(i) \
00238     firstprivate(genes, queries, datasets, outputCoexpression, outputQueryCoexpression, outputNormalized, outputExpression, \
00239     outputQueryExpression, NaN) \
00240     shared(pcl, vc, sizeD, datasetName, queryName, geneName, d_vecG, d_vecQ, d_vecCoexpression, d_vecqCoexpression, \
00241     mapstrintGene, mapstrstrDatasetPlatform, mapstriPlatform, vp) \
00242     schedule(dynamic)
00243     for(i=0; i<vc.size(); i++){
00244         CPCL *pp = vc[i];
00245         size_t j, k;
00246         //int ps = pp->GetExperiments() - 2;
00247         //int gs = pp->GetExperiments();
00248         int ps = pp->GetExperiments();
00249         int gs = pp->GetExperiments();
00250 
00251         CFullMatrix<float> *fq = NULL;
00252         
00253         CFullMatrix<float>* fq_RBP = NULL;
00254 
00255         CFullMatrix<float> *ff = NULL;
00256         vector<float> vCoexpression;
00257         vCoexpression.resize(genes);
00258         vector<float> vqCoexpression;
00259         vqCoexpression.resize(queryName.size());
00260 
00261         CSeekDataset *vd = NULL;
00262         CSeekPlatform *pl = NULL;
00263         sizeD[i] = (float)ps;
00264         d_vecG[i] = vector<float>();    
00265         d_vecQ[i] = vector<float>();    
00266         d_vecCoexpression[i] = vector<float>(); 
00267         d_vecqCoexpression[i] = vector<float>();    
00268 
00269         set<string> absent;
00270 
00271         if(outputCoexpression || outputQueryCoexpression){
00272             vd = new CSeekDataset();
00273             string strFileStem = datasetName[i].substr(0, datasetName[i].find(".bin"));
00274 
00275             int dbID = mapstrintDatasetDB[strFileStem];
00276     
00277             string strAvgPath = cc[dbID]->GetValue("prep") + "/" + strFileStem + ".gavg"; //avg and prep path share same directory
00278             string strPresencePath = cc[dbID]->GetValue("prep") + "/" + strFileStem + ".gpres";
00279             string strSinfoPath = cc[dbID]->GetValue("sinfo") + "/" + strFileStem + ".sinfo";
00280 
00281 
00282             vd->ReadGeneAverage(strAvgPath);
00283             vd->ReadGenePresence(strPresencePath);
00284             vd->ReadDatasetAverageStdev(strSinfoPath);
00285             vd->InitializeGeneMap();
00286 
00287             string strPlatform = mapstrstrDatasetPlatform.find(strFileStem)->second;
00288             utype platform_id = mapstriPlatform.find(strPlatform)->second;
00289             vd->SetPlatform(vp[platform_id]);
00290             pl = &vd->GetPlatform();
00291 
00292             fq = new CFullMatrix<float>();
00293             fq->Initialize(queryName.size(), ps);
00294 
00295             //NEW 3/5/2013====================================
00296             if(outputQueryCoexpression){
00297                 const vector<string> gNames = pp->GetGeneNames();
00298                 fq_RBP = new CFullMatrix<float>();
00299                 fq_RBP->Initialize(gNames.size(), ps);
00300                 //fprintf(stderr, "%d %d X1\n", i, gNames.size());
00301                 //#pragma omp parallel for \
00302                 shared(pp, fq_RBP) private(k, j) firstprivate(gs) \
00303                 schedule(dynamic)
00304                 /*for(k=0; k<gNames.size(); k++){
00305                     int g = pp->GetGene(gNames[k]);
00306                     float *vv = pp->Get(g);
00307                     for(j=0; j<gs; j++)
00308                         fq_RBP->Set(g, j, vv[j]);
00309                     float mean = 0;
00310                     for(j=0; j<gs; j++)
00311                         mean+=fq_RBP->Get(g, j);
00312                     mean/=(float) (gs);
00313                     float stdev = 0;
00314                     for(j=0; j<gs; j++)
00315                         stdev+=(fq_RBP->Get(g, j) - mean) * (fq_RBP->Get(g, j) - mean);
00316                     stdev/=(float) (gs);
00317                     if(stdev<=0){
00318                         absent.insert(gNames[k]);
00319                     }   
00320                     stdev = sqrt(stdev);
00321                     if(isnan(stdev) || isinf(stdev)){
00322                         fprintf(stderr, "Error:, standard deviation is zero\n");
00323                     }
00324                     for(j=0; j<gs; j++){
00325                         float t1 = fq_RBP->Get(g, j);
00326                         fq_RBP->Set(g, j, (t1 - mean) / stdev);
00327                         
00328                     }
00329                 }*/
00330                 //fprintf(stderr, "%d X2\n", i);
00331             }
00332             //====================================================
00333 
00334 
00335             if(outputQueryExpression){
00336             for(k=0; k<queryName.size(); k++){
00337                 int g = pp->GetGene(queryName[k]);
00338                 if(g==-1){ //does not contain the gene in the dataset
00339                     for(j=0; j<gs; j++){
00340                         fq->Set(k, j, NaN);
00341                         d_vecQ[i].push_back(fq->Get(k, j));
00342                     }
00343                     continue;
00344                 }
00345                 float *vv = pp->Get(g);
00346                 for(j=0; j<gs; j++)
00347                     fq->Set(k, j, vv[j]);
00348                 if(!outputNormalized){
00349                     for(j=0; j<gs; j++)
00350                         d_vecQ[i].push_back(fq->Get(k, j));
00351                 }
00352                 
00353                 //normalize
00354                 float mean = 0;
00355                 for(j=0; j<gs; j++)
00356                     mean+=fq->Get(k, j);
00357                 mean/=(float) (gs);
00358                 float stdev = 0;
00359                 for(j=0; j<gs; j++)
00360                     stdev+=(fq->Get(k, j) - mean) * (fq->Get(k, j) - mean);
00361                 stdev/=(float) (gs);
00362                 stdev = sqrt(stdev);
00363                 for(j=0; j<gs; j++){
00364                     float t1 = fq->Get(k, j);
00365                     fq->Set(k, j, (t1 - mean) / stdev);
00366                 }
00367 
00368                 if(outputNormalized){
00369                     for(j=0; j<gs; j++)
00370                         d_vecQ[i].push_back(fq->Get(k, j));
00371                 }
00372             }
00373             }
00374         }
00375 
00376         fprintf(stderr, "allocating space %d %d...\n", geneName.size(),
00377             ps);
00378         ff = new CFullMatrix<float>();
00379         ff->Initialize(genes, ps);
00380         fprintf(stderr, "done allocating space.\n");
00381 
00382         if(outputExpression){
00383         for(k=0; k<geneName.size(); k++){
00384             int g = pp->GetGene(geneName[k]);
00385             if(g==-1){
00386                 for(j=0; j<gs; j++){
00387                     ff->Set(k, j, NaN);
00388                     d_vecG[i].push_back(ff->Get(k, j));
00389                 }
00390                 continue;
00391             }
00392             float *vv = pp->Get(g);
00393             for(j=0; j<gs; j++)
00394                 ff->Set(k, j, vv[j]);
00395 
00396             if(!outputNormalized){
00397                 for(j=0; j<gs; j++){
00398                     d_vecG[i].push_back(ff->Get(k, j));
00399                 }
00400             }
00401             
00402             //normalize 
00403             float mean = 0;
00404             for(j=0; j<gs; j++)
00405                 mean+=ff->Get(k, j);
00406             mean/=(float) (gs);
00407             float stdev = 0;
00408             for(j=0; j<gs; j++)
00409                 stdev+=(ff->Get(k, j) - mean) * (ff->Get(k, j) - mean);
00410             stdev/=(float) (gs);
00411             stdev = sqrt(stdev);
00412             for(j=0; j<gs; j++){
00413                 float t1 = ff->Get(k, j);
00414                 ff->Set(k, j, (t1 - mean) / stdev);
00415             }
00416 
00417             if(outputNormalized){
00418                 for(j=0; j<gs; j++){
00419                     d_vecG[i].push_back(ff->Get(k, j));
00420                 }
00421             }
00422         }
00423         }
00424 
00425         if(outputCoexpression){
00426             int kk;
00427             CMeasurePearNorm pn;
00428             for(k=0; k<geneName.size(); k++){
00429                 int g = pp->GetGene(geneName[k]);
00430                 if(g==-1){
00431                     vCoexpression[k] = NaN;
00432                     d_vecCoexpression[i].push_back(vCoexpression[k]);
00433                     continue;
00434                 }
00435                 float avgP = 0;
00436                 int totalQueryPresent = queryName.size();
00437                 for(kk=0; kk<queryName.size(); kk++){
00438                     int gg = pp->GetGene(queryName[kk]);
00439                     if(gg==-1){
00440                         totalQueryPresent--;
00441                         continue;
00442                     }
00443 
00444                     float *x1 = NULL;
00445                     float *x2 = NULL;
00446                     if(g<gg){
00447                         x1 = pp->Get(g);
00448                         x2 = pp->Get(gg);
00449                     }else{
00450                         x1 = pp->Get(gg);
00451                         x2 = pp->Get(g);
00452                     }
00453 
00454                     float p = (float) pn.Measure(x1, ps, x2, ps, 
00455                         IMeasure::EMapCenter, NULL, NULL);  
00456 
00457                     p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev();
00458 
00459                     int gID = mapstrintGene[geneName[k]];
00460                     int qID = mapstrintGene[queryName[kk]];
00461 
00462                     int qb = CMeta::Quantize(p, quant);
00463                     p = quant[qb];
00464 
00465                     //fprintf(stderr, "Correlation %d %d %.5f %.5f %.5f %.5f\n", qID, gID, p, vd->GetGeneAverage(gID), 
00466                     //  pl->GetPlatformAvg(qID), pl->GetPlatformStdev(qID));
00467 
00468                     //subtract hubbiness
00469                     p = (p - vd->GetGeneAverage(gID) - pl->GetPlatformAvg(qID)) / pl->GetPlatformStdev(qID) ;
00470                     //p = p - vd->GetGeneAverage(gID);
00471                     p = max((float) min(p, (float) 3.2), (float)-3.2);
00472 
00473                     avgP+=p;
00474                 }
00475                 if(totalQueryPresent==0)
00476                     avgP = NaN;
00477                 else
00478                     avgP/=(float)(totalQueryPresent);
00479                 
00480                 vCoexpression[k] = avgP;
00481                 d_vecCoexpression[i].push_back(avgP);
00482             }
00483         }
00484 
00485         if(outputQueryCoexpression){
00486             int kk=0;
00487 
00488             const vector<string> gNames = pp->GetGeneNames();
00489             vector<char> qMap;
00490             CSeekTools::InitVector(qMap, vecstrGenes.size(), (char)0);
00491 
00492             int totQuery = 0;
00493             for(k=0; k<queryName.size(); k++){
00494                 int g = pp->GetGene(queryName[k]);
00495                 if(g==-1) continue;
00496                 int mg = mapstrintGene[queryName[k]];
00497                 qMap[mg] = 1;
00498                 totQuery++;
00499             }
00500 
00501             CMeasurePearNorm pn;
00502 
00503             for(k=0; k<queryName.size(); k++){
00504                 int g = pp->GetGene(queryName[k]);
00505                 if(g==-1){
00506                     vqCoexpression[k] = NaN;
00507                     d_vecqCoexpression[i].push_back(vqCoexpression[k]);
00508                     continue;
00509                 }
00510                 vector<AResult> ar;
00511                 ar.resize(gNames.size());
00512 
00513                 for(kk=0; kk<gNames.size(); kk++){
00514                     int gg = pp->GetGene(gNames[kk]);
00515                     ar[kk].i = (utype) mapstrintGene[gNames[kk]];
00516                     if(g==gg){
00517                         ar[kk].f = 0;
00518                         continue;
00519                     }
00520 
00521                     float *x1 = NULL;
00522                     float *x2 = NULL;
00523                     if(g<gg){
00524                         x1 = pp->Get(g);
00525                         x2 = pp->Get(gg);
00526                     }else{
00527                         x1 = pp->Get(gg);
00528                         x2 = pp->Get(g);
00529                     }
00530 
00531                     float p = (float) pn.Measure(x1, ps, x2, ps, 
00532                         IMeasure::EMapCenter, NULL, NULL);  
00533 
00534                     float px = p;
00535                     if(!(p<5.0 && p>-5.0)){
00536                         ar[kk].f = 0;
00537                         continue;
00538                     }
00539 
00540                     //get z-score (dataset wide)
00541                     p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev();
00542                     int gID = mapstrintGene[gNames[kk]];
00543                     int qID = mapstrintGene[queryName[k]];
00544 
00545                     int qb = CMeta::Quantize(p, quant);
00546                     p = quant[qb];
00547 
00548                     //fprintf(stderr, "Correlation %d %d %.5f %.5f %.5f %.5f\n", qID, gID, p, vd->GetGeneAverage(gID), 
00549                     //  pl->GetPlatformAvg(qID), pl->GetPlatformStdev(qID));
00550 
00551                     //subtract hubbiness
00552                     p = (p - vd->GetGeneAverage(gID) - pl->GetPlatformAvg(qID)) / pl->GetPlatformStdev(qID) ;
00553                     //p = p - vd->GetGeneAverage(gID);
00554                     p = max((float) min(p, (float) 3.2), (float)-3.2);
00555 
00556                     //fprintf(stderr, "%d error, infinite or nan! %.3f, %.3f, %.3f\n", i, 
00557                     //  vd->GetDatasetAverage(), vd->GetDatasetStdev(), 
00558                     //  vd->GetGeneAverage(mapstrintGene[gNames[kk]]));
00559                     //fprintf(stderr, "Correlation %d %d %.5f\n", qID, gID, p);
00560                     ar[kk].f = (utype) (p*100.0 + 320);
00561 
00562                 }
00563 
00564                 int TOP = 1000;
00565                 if(ar.size()<TOP){
00566                     TOP = ar.size();
00567                 }
00568                 //fprintf(stderr, "%d H2\n", i);
00569                 nth_element(ar.begin(), ar.begin()+TOP, ar.end());
00570                 sort(ar.begin(), ar.begin()+TOP);
00571                 //fprintf(stderr, "%d H3\n", i);
00572 
00573                 float rbp = 0;
00574                 //utype jj = 0;
00575                 for(kk=0; kk<TOP; kk++){
00576                     if(qMap[ar[kk].i]==0) continue;
00577                     if(ar[kk].f==0) break;
00578                     rbp+=pow(RBP_P, kk);
00579                     fprintf(stderr, "Sorted %d %d %d %.5f\n", i, kk, ar[kk].i, (ar[kk].f-320)/100.0f);
00580                     //jj++;
00581                     //fprintf(stderr, "Good %d %d\n", i, kk);
00582                     //jj++;
00583                 }
00584                 rbp *= (1.0 - RBP_P);
00585             
00586                 rbp = rbp / totQuery * 1000;
00587                 fprintf(stderr, "%d %.3e\n", i, rbp);
00588                 vqCoexpression[k] = rbp;
00589                 d_vecqCoexpression[i].push_back(rbp);
00590             }
00591             
00592             
00593             /*for(k=0; k<queryName.size(); k++){
00594                 int g = pp->GetGene(queryName[k]);
00595                 if(g==-1){
00596                     vqCoexpression[k] = NaN;
00597                     vecqCoexpression.push_back(vqCoexpression[k]);
00598                     continue;
00599                 }
00600                 float avgP = 0;
00601                 int totalQueryPresent = queryName.size() - 1;
00602 
00603                 for(kk=0; kk<queryName.size(); kk++){
00604                     if(kk==k) continue;
00605                     int gg = pp->GetGene(queryName[kk]);
00606                     if(gg==-1){
00607                         totalQueryPresent--;
00608                         continue;
00609                     }
00610 
00611                     float *x1 = NULL;
00612                     float *x2 = NULL;
00613                     if(g<gg){
00614                         x1 = pp->Get(g);
00615                         x2 = pp->Get(gg);
00616                     }else{
00617                         x1 = pp->Get(gg);
00618                         x2 = pp->Get(g);
00619                     }
00620 
00621                     float p = (float) pn.Measure(x1, ps, x2, ps, 
00622                         IMeasure::EMapCenter, NULL, NULL);  
00623                     float px = p;
00624 
00625                     if(!(p<5.0 && p>-5.0)){
00626                         continue;
00627                     }
00628 
00629                     //get z-score (dataset wide)
00630                     p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev();
00631                     int gID = mapstrintGene[queryName[kk]];
00632                     int qID = mapstrintGene[queryName[k]];
00633 
00634                     int qb = CMeta::Quantize(p, quant);
00635                     p = quant[qb];
00636 
00637                     //fprintf(stderr, "Correlation %d %d %.5f %.5f %.5f %.5f\n", qID, gID, p, vd->GetGeneAverage(gID), 
00638                     //  pl->GetPlatformAvg(qID), pl->GetPlatformStdev(qID));
00639 
00640                     //subtract hubbiness
00641                     p = (p - vd->GetGeneAverage(gID) - pl->GetPlatformAvg(qID)) / pl->GetPlatformStdev(qID) ;
00642                     //p = p - vd->GetGeneAverage(gID);
00643                     p = max((float) min(p, (float) 3.2), (float)-3.2);
00644 
00645                     //float p = 0;
00646                     //for(j=2; j<gs; j++)
00647                     //  p+= fq->Get(k, j-2)*fq->Get(kk, j-2);
00648                     //p/=(float)(gs-2);
00649                     //p = 0.5 * log((1.0+p)/(1.0-p));
00650                     //p = max((float) min(p, (float) 3.2), (float)-3.2);
00651                     //get z-score (dataset wide)
00652                     //p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev();
00653                     //subtract hubbiness
00654                     //p = p - vd->GetGeneAverage(mapstrintGene[queryName[kk]]);
00655                     
00656 
00657                     avgP+=p;
00658                 }
00659                 if(totalQueryPresent==0)
00660                     avgP = NaN;
00661                 else
00662                     avgP/=(float)(totalQueryPresent);
00663                 
00664                 vqCoexpression[k] = avgP;
00665                 d_vecqCoexpression[i].push_back(avgP);
00666             }*/
00667             
00668 
00669 
00670         }
00671         
00672         if(outputCoexpression || outputQueryCoexpression){
00673             delete vd;
00674             delete fq;
00675             if(outputQueryCoexpression)
00676                 delete fq_RBP;
00677         }
00678 
00679         delete ff;
00680     }
00681 
00682     for(i=0; i<vc.size(); i++){
00683         vecG.insert(vecG.end(), d_vecG[i].begin(), d_vecG[i].end());
00684         vecQ.insert(vecQ.end(), d_vecQ[i].begin(), d_vecQ[i].end());
00685         vecCoexpression.insert(vecCoexpression.end(), 
00686             d_vecCoexpression[i].begin(), d_vecCoexpression[i].end());
00687         vecqCoexpression.insert(vecqCoexpression.end(), 
00688             d_vecqCoexpression[i].begin(), d_vecqCoexpression[i].end());
00689     }
00690 
00691 
00692     if(CSeekNetwork::Send(new_fd, sizeD)!=0){
00693         fprintf(stderr, "Error sending messages\n");
00694     }
00695 
00696     if(outputExpression){
00697         if(CSeekNetwork::Send(new_fd, vecG)!=0){
00698             fprintf(stderr, "Error sending messages\n");
00699         }
00700     }
00701 
00702     if(outputQueryExpression){
00703         if(CSeekNetwork::Send(new_fd, vecQ)!=0){
00704             fprintf(stderr, "Error sending messages\n");
00705         }
00706     }
00707 
00708     if(outputCoexpression){
00709         if(CSeekNetwork::Send(new_fd, vecCoexpression)!=0){
00710             fprintf(stderr, "Error sending messages\n");
00711         }
00712     }
00713 
00714     if(outputQueryCoexpression){
00715         if(CSeekNetwork::Send(new_fd, vecqCoexpression)!=0){
00716             fprintf(stderr, "Error sending messages\n");
00717         }
00718     }
00719 
00720     for(i=0; i<NUM_DSET_MEMORY; i++){
00721         loaded[i] = 0;
00722     }
00723 
00724     THREAD_OCCUPIED[tid] = 0;
00725 
00726     pthread_mutex_unlock(&mutexGet);
00727 
00728     int ret = 0;
00729     close(new_fd);
00730     pthread_exit((void*)ret);
00731     //return void;
00732 }
00733 
00734 int main( int iArgs, char** aszArgs ) {
00735     static const size_t c_iBuffer   = 1024;
00736 #ifdef WIN32
00737     pthread_win32_process_attach_np( );
00738 #endif // WIN32
00739 
00740     gengetopt_args_info sArgs;
00741 
00742     if( cmdline_parser( iArgs, aszArgs, &sArgs ) ) {
00743         cmdline_parser_print_help( );
00744         return 1;
00745     }
00746 
00747     signal(SIGPIPE, SIG_IGN);
00748     size_t i, j;
00749     for(i=0; i<NUM_THREADS; i++){
00750         THREAD_OCCUPIED[i] = 0;
00751     }
00752 
00753     PORT = sArgs.port_arg;
00754 
00755     CSeekDBSetting *dbSetting = new CSeekDBSetting(
00756         "NA", //default gvar arg, argument not needed for PCLServer
00757         sArgs.sinfo_arg, sArgs.platform_arg, sArgs.prep_arg,
00758         ".",  //default DB arg, argument not needed for PCLServer
00759         sArgs.gene_arg, sArgs.quant_arg, sArgs.dset_arg,
00760         21702 //default num_db arg, argument not needed for PCLServer
00761     );
00762 
00763     //vector<CSeekDBSetting*> cc;
00764     cc.push_back(dbSetting);
00765 
00766     pcl_input_dir = sArgs.input_arg;
00767 
00768     string add_db = sArgs.additional_db_arg;
00769     if(add_db!="NA"){
00770         ifstream ifsm;
00771         ifsm.open(add_db.c_str());
00772         const int lineSize =1024;
00773         if(!ifsm.is_open()){
00774             fprintf(stderr, "Error opening file %s\n", add_db.c_str());
00775             return false;
00776         }
00777         char acBuffer[lineSize];
00778         utype c_iBuffer = lineSize;
00779         vector<map<string,string> > parameters; //an array of CDatabase's
00780         while(!ifsm.eof()){
00781             ifsm.getline(acBuffer, c_iBuffer-1);
00782             if(acBuffer[0]==0) break;
00783             acBuffer[c_iBuffer-1]=0;
00784             string strB = acBuffer;
00785             if(strB=="START"){
00786                 map<string,string> p;
00787                 while(!ifsm.eof()){
00788                     ifsm.getline(acBuffer, c_iBuffer-1);
00789                     if(acBuffer[0]==0){
00790                         fprintf(stderr, "Invalid line (empty)\n");
00791                         return 1;
00792                     }
00793                     strB = acBuffer;
00794                     if(strB=="END") break;
00795                     vector<string> tok;
00796                     CMeta::Tokenize(acBuffer, tok); //separator is tab
00797                     p[tok[0]] = tok[1];
00798                 }
00799                 parameters.push_back(p);
00800             }
00801         }
00802         ifsm.close();
00803         if(parameters.size()==0){
00804             fprintf(stderr, "Error, extra_db setting file must begin with START and end with END lines\n");
00805             return 1;
00806         }
00807 
00808         for(i=0; i<parameters.size(); i++){
00809         string sinfo_dir = "NA";
00810         string gvar_dir = "NA";
00811         string platform_dir = "NA";
00812         string prep_dir = "NA";
00813         string db_dir = "NA";
00814         string dset_map_file = "NA";
00815         string gene_map_file = "NA";
00816         string quant_file = "NA";
00817         int num_db = -1;
00818 
00819         if(parameters[i].find("SINFO_DIR")->second=="NA"){
00820             fprintf(stderr, "Please specify an sinfo directory for the extra db\n");
00821             return false;
00822         }
00823         sinfo_dir = parameters[i].find("SINFO_DIR")->second;
00824         if(parameters[i].find("GVAR_DIR")!=parameters[i].end())
00825             gvar_dir = parameters[i].find("GVAR_DIR")->second;
00826         if(parameters[i].find("PREP_DIR")==parameters[i].end() ||
00827             parameters[i].find("PLATFORM_DIR")==parameters[i].end() ||
00828             parameters[i].find("DB_DIR")==parameters[i].end() ||
00829             parameters[i].find("DSET_MAP_FILE")==parameters[i].end() ||
00830             parameters[i].find("GENE_MAP_FILE")==parameters[i].end() ||
00831             parameters[i].find("QUANT_FILE")==parameters[i].end() ||
00832             parameters[i].find("NUMBER_OF_DB")==parameters[i].end()){
00833             fprintf(stderr, "Some arguments are missing. Please make sure the following are provided:\n");
00834             fprintf(stderr, "PREP_DIR, DB_DIR, DSET_MAP_FILE, GENE_MAP_FILE, QUANT_FILE, NUMBER_OF_DB\n");
00835             return false;
00836         }
00837 
00838         platform_dir = parameters[i].find("PLATFORM_DIR")->second;
00839         db_dir = parameters[i].find("DB_DIR")->second;
00840         prep_dir = parameters[i].find("PREP_DIR")->second;
00841         dset_map_file = parameters[i].find("DSET_MAP_FILE")->second;
00842         gene_map_file = parameters[i].find("GENE_MAP_FILE")->second;
00843         quant_file = parameters[i].find("QUANT_FILE")->second;
00844         num_db = atoi(parameters[i].find("NUMBER_OF_DB")->second.c_str());
00845 
00846         CSeekDBSetting *dbSetting2 = new CSeekDBSetting(gvar_dir, sinfo_dir,
00847             platform_dir, prep_dir, db_dir, gene_map_file, quant_file, dset_map_file,
00848             num_db);
00849         cc.push_back(dbSetting2);
00850         }
00851     }
00852     
00853     if(!CSeekTools::ReadListTwoColumns(sArgs.gene_arg, vecstrGeneID, vecstrGenes))
00854         return false;
00855     for(i=0; i<vecstrGenes.size(); i++)
00856         mapstrintGene[vecstrGenes[i]] = (int) i;
00857 
00858     for(i=0; i<cc.size(); i++){
00859         vector<string> vD, vDP;
00860         if(!CSeekTools::ReadListTwoColumns(cc[i]->GetValue("dset"), vD, vDP))
00861             return false;
00862         for(j=0; j<vD.size(); j++){
00863             vecstrDatasets.push_back(vD[j]);
00864             vecstrDP.push_back(vDP[j]);
00865             mapstrintDatasetDB[vD[j]] = (int) i;            
00866         }
00867         vector<string> vP;
00868         map<string,utype> mP;
00869         vector<CSeekPlatform> vpx;
00870         CSeekTools::ReadPlatforms(cc[i]->GetValue("platform"), vpx, vP, mP);
00871         int cur=vp.size();
00872         for(map<string,utype>::iterator it=mP.begin();
00873             it!=mP.end(); it++){
00874             mapstriPlatform[it->first] = it->second+cur;
00875         }
00876         vp.resize(cur+vpx.size());
00877         for(j=0; j<vpx.size(); j++)
00878             vp[cur+j].Copy(vpx[j]);
00879     }   
00880 
00881     for(i=0; i<vecstrDatasets.size(); i++){
00882         mapstrstrDatasetPlatform[vecstrDatasets[i]] = vecstrDP[i];
00883         mapstrintDataset[vecstrDatasets[i]] = i;
00884     }
00885 
00886     //==================================================
00887 
00888 
00889     int sockfd, new_fd;
00890     struct addrinfo hints, *servinfo, *p;
00891     struct sockaddr_storage their_addr;
00892     socklen_t sin_size;
00893     struct sigaction sa;
00894     char s[INET6_ADDRSTRLEN];
00895     char buf[10];
00896     int rv;
00897     int yes = 1;
00898 
00899     memset(&hints, 0, sizeof(hints));
00900     hints.ai_family=AF_UNSPEC;
00901     hints.ai_socktype = SOCK_STREAM;
00902     hints.ai_flags = AI_PASSIVE;
00903 
00904     if((rv=getaddrinfo(NULL, PORT, &hints, &servinfo))!=0){
00905         fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
00906         return 1;
00907     }
00908 
00909     // loop through all the results and bind to the first we can
00910     for(p = servinfo; p != NULL; p = p->ai_next) {
00911         if ((sockfd = socket(p->ai_family, p->ai_socktype,
00912             p->ai_protocol)) == -1) {
00913             perror("server: socket");
00914             continue;
00915         }
00916         if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
00917             sizeof(int)) == -1) {
00918             perror("setsockopt");
00919             exit(1);
00920         }
00921         if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
00922             close(sockfd);
00923             perror("server: bind");
00924             continue;
00925         }
00926         break;
00927     }
00928 
00929     if (p == NULL)  {
00930         fprintf(stderr, "server: failed to bind\n");
00931         return 2;
00932     }
00933 
00934     freeaddrinfo(servinfo); // all done with this structure
00935 
00936     if (listen(sockfd, BACKLOG) == -1) {
00937         perror("listen");
00938         exit(1);
00939     }
00940 
00941     sa.sa_handler = sigchld_handler; // reap all dead processes
00942     sigemptyset(&sa.sa_mask);
00943     sa.sa_flags = SA_RESTART;
00944     if (sigaction(SIGCHLD, &sa, NULL) == -1) {
00945         perror("sigaction");
00946         exit(1);
00947     }
00948 
00949     printf("server: waiting for connections...\n");
00950     struct thread_data thread_arg[NUM_THREADS];
00951     pthread_t th[NUM_THREADS];
00952     int d = 0;
00953 
00954     pthread_mutex_init(&mutexGet, NULL);
00955 
00956     fprintf(stderr, "Start init.\n");
00957 
00958     pcl = new CPCL*[NUM_DSET_MEMORY];
00959 
00960     loaded = new char[NUM_DSET_MEMORY];
00961 
00962     available.clear();
00963     for(i=0; i<NUM_DSET_MEMORY; i++){
00964         available.push_back(i);
00965         loaded[i] = 0;
00966         pcl[i] = new CPCL();
00967     }
00968 
00969     fprintf(stderr, "Finished initializations.\n");
00970     while(1){
00971         sin_size = sizeof their_addr;
00972         new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size);
00973         if(new_fd==-1){
00974             perror("accept");
00975             continue;
00976         }
00977         inet_ntop(their_addr.ss_family, get_in_addr(
00978             (struct sockaddr *)&their_addr), s, sizeof s);
00979         printf("server, got connection from %s\n", s);
00980         for(d=0; d<NUM_THREADS; d++){
00981             if(THREAD_OCCUPIED[d]==0) break;
00982         }
00983         if(d==NUM_THREADS){
00984             close(new_fd);
00985             continue;
00986         }
00987 
00988         THREAD_OCCUPIED[d] = 1;
00989         string mode;
00990 
00991         if(CSeekNetwork::Receive(new_fd, mode)!=0){
00992             fprintf(stderr, "Error: receiving message\n");
00993             close(new_fd);
00994             continue;
00995         }
00996 
00997         //mode, 5 digits
00998         //1 - output coexpression calculation (with gene hubbiness removed), 
00999         //require setting two groups of genes, one for query (1 or many), 
01000         //and one other gene (to calculate coexpression on)
01001         //2 - output gene-normalized expression instead of original expression
01002         //3 - output gene expression (can be normalized or unnormalized, depends on 2)
01003         //4 - output query expression
01004         //5 - output query coexpression (compared to 1, which is gene-to-query 
01005         //    coexpression. This is query-to-query coexpression)
01006 
01007         bool outputCoexpression = false;
01008         bool outputNormalized = false;
01009         bool outputExpression = false;
01010         bool outputQueryExpression = false;
01011         bool outputQueryCoexpression = false;
01012         if(mode[0]=='1') 
01013             outputCoexpression = true;
01014         if(mode[1]=='1') 
01015             outputNormalized = true;
01016         if(mode[2]=='1') 
01017             outputExpression = true;
01018         if(mode[3]=='1') 
01019             outputQueryExpression = true;
01020         if(mode[4]=='1') 
01021             outputQueryCoexpression = true;
01022 
01023         vector<string> dsetName, geneName, queryName;
01024         string qname, gname, dname;
01025         string strrbp; //-1: disabled, 0-0.9999
01026         float rbp_p = -1;
01027 
01028         if(outputCoexpression || outputQueryCoexpression){
01029             if( CSeekNetwork::Receive(new_fd, qname)!=0 ||
01030                 CSeekNetwork::Receive(new_fd, gname)!=0 ||
01031                 CSeekNetwork::Receive(new_fd, dname)!=0 ||
01032                 CSeekNetwork::Receive(new_fd, strrbp)!=0){
01033                 fprintf(stderr, "Error: receiving message\n");
01034                 close(new_fd);
01035                 continue;
01036             }
01037             CMeta::Tokenize(dname.c_str(), dsetName);
01038             CMeta::Tokenize(gname.c_str(), geneName);
01039             CMeta::Tokenize(qname.c_str(), queryName);
01040             rbp_p = atof(strrbp.c_str());
01041         }
01042         else{
01043             if( CSeekNetwork::Receive(new_fd, gname)!=0 ||
01044                 CSeekNetwork::Receive(new_fd, dname)!=0 ){
01045                 fprintf(stderr, "Error: receiving message\n");
01046                 close(new_fd);
01047                 continue;
01048             }
01049             CMeta::Tokenize(dname.c_str(), dsetName);
01050             CMeta::Tokenize(gname.c_str(), geneName);
01051         }
01052     
01053         thread_arg[d].threadid = d;
01054         thread_arg[d].new_fd = new_fd;
01055         thread_arg[d].geneName = geneName;
01056         thread_arg[d].datasetName = dsetName;
01057         thread_arg[d].queryName = queryName;
01058         thread_arg[d].outputNormalized = outputNormalized;
01059         thread_arg[d].outputCoexpression = outputCoexpression;
01060         thread_arg[d].outputExpression = outputExpression;
01061         thread_arg[d].outputQueryExpression = outputQueryExpression;
01062         thread_arg[d].outputQueryCoexpression = outputQueryCoexpression;
01063         thread_arg[d].rbp_p = rbp_p;
01064 
01065         fprintf(stderr, "Arguments: %d %d %s %s\n", d, new_fd, dname.c_str(), gname.c_str());
01066         if(outputCoexpression){
01067             fprintf(stderr, "Arguments: %s\n", qname.c_str());
01068         }
01069 
01070         int ret;
01071         pthread_create(&th[d], NULL, do_query, (void *) &thread_arg[d]);
01072         /*pthread_join(th[d], (void **)&ret);
01073         if(ret==0){
01074             close(new_fd);
01075         }*/
01076     }
01077 
01078 
01079 #ifdef WIN32
01080     pthread_win32_process_detach_np( );
01081 #endif // WIN32
01082     return 0; }