Sleipnir
|
00001 /***************************************************************************** 00002 * This file is provided under the Creative Commons Attribution 3.0 license. 00003 * 00004 * You are free to share, copy, distribute, transmit, or adapt this work 00005 * PROVIDED THAT you attribute the work to the authors listed below. 00006 * For more information, please see the following web page: 00007 * http://creativecommons.org/licenses/by/3.0/ 00008 * 00009 * This file is a component of the Sleipnir library for functional genomics, 00010 * authored by: 00011 * Curtis Huttenhower (chuttenh@princeton.edu) 00012 * Mark Schroeder 00013 * Maria D. Chikina 00014 * Olga G. Troyanskaya (ogt@princeton.edu, primary contact) 00015 * 00016 * If you use this library, the included executable tools, or any related 00017 * code in your work, please cite the following publication: 00018 * Curtis Huttenhower, Mark Schroeder, Maria D. Chikina, and 00019 * Olga G. Troyanskaya. 00020 * "The Sleipnir library for computational functional genomics" 00021 *****************************************************************************/ 00022 #include "stdafx.h" 00023 #include "cmdline.h" 00024 #include <sys/socket.h> 00025 #include <sys/types.h> 00026 #include <netinet/in.h> 00027 #include <netdb.h> 00028 #include <arpa/inet.h> 00029 #include <sys/wait.h> 00030 #include <signal.h> 00031 #include <list> 00032 00033 #define BACKLOG 10 // how many pending connections queue will hold 00034 char *PORT; 00035 int NUM_DSET_MEMORY = 100; 00036 string pcl_input_dir; 00037 00038 CPCL **pcl; 00039 list<int> available; 00040 char *loaded; 00041 map<string, int> DNAME_MAP; 00042 map<int, int> return_val; 00043 map<int, string> DNAME_RMAP; 00044 00045 pthread_mutex_t mutexGet; 00046 00047 vector<CSeekDBSetting*> cc; 00048 00049 /*string strPrepInputDirectory; 00050 string strSinfoInputDirectory; 00051 string strDatasetInputDirectory; 00052 string strPlatformInputDirectory;*/ 00053 map<string, int> mapstrintGene; 00054 00055 vector<string> vecstrGeneID; 00056 vector<string> vecstrGenes; 00057 vector<string> vecstrDatasets; 00058 vector<string> vecstrDP; 00059 00060 map<string, int> mapstrintDatasetDB; 00061 map<string, utype> mapstriPlatform; 00062 map<string, string> mapstrstrDatasetPlatform; 00063 map<string, utype> mapstrintDataset; 00064 vector<string> vecstrPlatforms; 00065 vector<CSeekPlatform> vp; 00066 00067 00068 void sigchld_handler(int s){ 00069 while(waitpid(-1, NULL, WNOHANG) > 0); 00070 } 00071 00072 // get sockaddr, IPv4 or IPv6: 00073 void *get_in_addr(struct sockaddr *sa){ 00074 if (sa->sa_family == AF_INET) { 00075 return &(((struct sockaddr_in*)sa)->sin_addr); 00076 } 00077 return &(((struct sockaddr_in6*)sa)->sin6_addr); 00078 } 00079 00080 #define NUM_THREADS 8 00081 char THREAD_OCCUPIED[NUM_THREADS]; 00082 00083 int send_msg(int new_fd, char *c, int size){ 00084 int r = send(new_fd, c, size, 0); 00085 if(r==-1){ 00086 printf("client exists"); 00087 } 00088 return r; 00089 } 00090 00091 void cl(char *b, int size){ 00092 int i = 0; 00093 for(i=0; i<size; i++){ 00094 b[i] = '\0'; 00095 } 00096 } 00097 00098 struct thread_data{ 00099 vector<string> datasetName; 00100 vector<string> geneName; 00101 vector<string> queryName; 00102 int threadid; 00103 int new_fd; 00104 float rbp_p; 00105 bool outputNormalized; 00106 bool outputCoexpression; 00107 bool outputQueryCoexpression; 00108 bool outputExpression; 00109 bool outputQueryExpression; 00110 }; 00111 00112 int cpy(char *d, char *s, int beg, int num){ 00113 int i; 00114 for(i=0; i<num; i++){ 00115 d[beg+i] = s[i]; 00116 } 00117 return beg+num; 00118 } 00119 00120 int GetOpenSlot(){ 00121 int i = -1; 00122 int size = 0; 00123 while(size<available.size()){ 00124 i = available.front(); 00125 available.pop_front(); 00126 available.push_back(i); 00127 if(loaded[i]==0) break; 00128 size++; 00129 } 00130 if(size==available.size()){ 00131 return -1; 00132 } 00133 return i; 00134 } 00135 00136 void *do_query(void *th_arg){ 00137 struct thread_data *my = (struct thread_data *) th_arg; 00138 vector<string> datasetName = my->datasetName; 00139 vector<string> geneName = my->geneName; 00140 vector<string> queryName = my->queryName; 00141 bool outputNormalized = my->outputNormalized; 00142 bool outputCoexpression = my->outputCoexpression; 00143 bool outputQueryCoexpression = my->outputQueryCoexpression; 00144 bool outputExpression = my->outputExpression; 00145 bool outputQueryExpression = my->outputQueryExpression; 00146 int new_fd = my->new_fd; 00147 int tid = my->threadid; 00148 float RBP_P = my->rbp_p; 00149 00150 /* 00151 cl(buf, 5000); 00152 sprintf(buf, "Begin...\n"); 00153 if(send_msg(new_fd, buf, 5000)==-1){ 00154 THREAD_OCCUPIED[tid] = 0; 00155 pthread_exit(0); 00156 }*/ 00157 pthread_mutex_lock(&mutexGet); 00158 00159 size_t i; 00160 00161 vector<string>::const_iterator iterS = datasetName.begin(); 00162 vector<CPCL*> vc; 00163 00164 for(i=0; i<NUM_DSET_MEMORY; i++){ 00165 loaded[i] = 0; 00166 } 00167 00168 fprintf(stderr, "start processing...\n"); 00169 for(; iterS!=datasetName.end(); iterS++){ 00170 int n = -1; 00171 map<string, int>::const_iterator iterM = DNAME_MAP.find(*iterS); 00172 if(iterM!=DNAME_MAP.end()){ 00173 n = iterM->second; 00174 fprintf(stderr, "found %d for dataset %s...\n", n, iterS->c_str()); 00175 vc.push_back(pcl[n]); 00176 loaded[n] = 1; 00177 continue; 00178 } 00179 //pthread_mutex_lock(&mutexGet); 00180 n = GetOpenSlot(); 00181 map<int, string>::const_iterator iterRM = DNAME_RMAP.find(n); 00182 if(iterRM!=DNAME_RMAP.end()){ 00183 DNAME_MAP.erase(iterRM->second); 00184 DNAME_RMAP.erase(n); 00185 } 00186 DNAME_MAP[*iterS] = n; 00187 DNAME_RMAP[n] = *iterS; 00188 loaded[n] = 1; 00189 //pthread_mutex_unlock(&mutexGet); 00190 00191 fprintf(stderr, "acquired %d for dataset %s...\n", n, iterS->c_str()); 00192 //pcl[n]->Reset(); 00193 //string strFileStem = (*iterS).substr(0, (*iterS).find(".bin")); 00194 fprintf(stderr, "dataset reset \n"); 00195 string pcl_path = pcl_input_dir + "/" + *iterS; 00196 pcl[n]->Open(pcl_path.c_str()); 00197 //pcl[n]->Open(strFileStem.c_str(), 2, false, false); 00198 fprintf(stderr, "dataset opened\n"); 00199 vc.push_back(pcl[n]); 00200 } 00201 00202 //vector<CFullMatrix<float> *> vC; //gene expression 00203 //vector<CFullMatrix<float> *> vQ; //query expression (if enabled) (EXTRA) 00204 //int totNumExperiments = 0; 00205 int genes = geneName.size(); 00206 int queries = queryName.size(); //(EXTRA) 00207 int datasets = datasetName.size(); 00208 00209 vector<float> vecG, vecQ, vecCoexpression, vecqCoexpression; 00210 vector<float> sizeD; 00211 sizeD.resize(vc.size()); 00212 00213 vector< vector<float> > d_vecG, d_vecQ, d_vecCoexpression, d_vecqCoexpression; 00214 d_vecG.resize(vc.size()); 00215 d_vecQ.resize(vc.size()); 00216 d_vecCoexpression.resize(vc.size()); 00217 d_vecqCoexpression.resize(vc.size()); 00218 00219 //Qian added 00220 //vector<CSeekDataset *> vd; //(EXTRA) 00221 //vd.resize(vc.size()); //(EXTRA) 00222 //CFullMatrix<float> *vCoexpression = new CFullMatrix<float>(); //(EXTRA) 00223 //vCoexpression->Initialize(genes, datasets); //(EXTRA) 00224 00225 fprintf(stderr, "Reading data...\n"); 00226 00227 float NaN = -9999; 00228 //for each dataset 00229 00230 vector<float> quant; 00231 00232 //QUANT file must be consistent 00233 CSeekTools::ReadQuantFile(cc[0]->GetValue("quant"), quant); 00234 //CSeekTools::ReadQuantFile("/home/qzhu/Seek/quant2", quant); 00235 00236 #pragma omp parallel for \ 00237 private(i) \ 00238 firstprivate(genes, queries, datasets, outputCoexpression, outputQueryCoexpression, outputNormalized, outputExpression, \ 00239 outputQueryExpression, NaN) \ 00240 shared(pcl, vc, sizeD, datasetName, queryName, geneName, d_vecG, d_vecQ, d_vecCoexpression, d_vecqCoexpression, \ 00241 mapstrintGene, mapstrstrDatasetPlatform, mapstriPlatform, vp) \ 00242 schedule(dynamic) 00243 for(i=0; i<vc.size(); i++){ 00244 CPCL *pp = vc[i]; 00245 size_t j, k; 00246 //int ps = pp->GetExperiments() - 2; 00247 //int gs = pp->GetExperiments(); 00248 int ps = pp->GetExperiments(); 00249 int gs = pp->GetExperiments(); 00250 00251 CFullMatrix<float> *fq = NULL; 00252 00253 CFullMatrix<float>* fq_RBP = NULL; 00254 00255 CFullMatrix<float> *ff = NULL; 00256 vector<float> vCoexpression; 00257 vCoexpression.resize(genes); 00258 vector<float> vqCoexpression; 00259 vqCoexpression.resize(queryName.size()); 00260 00261 CSeekDataset *vd = NULL; 00262 CSeekPlatform *pl = NULL; 00263 sizeD[i] = (float)ps; 00264 d_vecG[i] = vector<float>(); 00265 d_vecQ[i] = vector<float>(); 00266 d_vecCoexpression[i] = vector<float>(); 00267 d_vecqCoexpression[i] = vector<float>(); 00268 00269 set<string> absent; 00270 00271 if(outputCoexpression || outputQueryCoexpression){ 00272 vd = new CSeekDataset(); 00273 string strFileStem = datasetName[i].substr(0, datasetName[i].find(".bin")); 00274 00275 int dbID = mapstrintDatasetDB[strFileStem]; 00276 00277 string strAvgPath = cc[dbID]->GetValue("prep") + "/" + strFileStem + ".gavg"; //avg and prep path share same directory 00278 string strPresencePath = cc[dbID]->GetValue("prep") + "/" + strFileStem + ".gpres"; 00279 string strSinfoPath = cc[dbID]->GetValue("sinfo") + "/" + strFileStem + ".sinfo"; 00280 00281 00282 vd->ReadGeneAverage(strAvgPath); 00283 vd->ReadGenePresence(strPresencePath); 00284 vd->ReadDatasetAverageStdev(strSinfoPath); 00285 vd->InitializeGeneMap(); 00286 00287 string strPlatform = mapstrstrDatasetPlatform.find(strFileStem)->second; 00288 utype platform_id = mapstriPlatform.find(strPlatform)->second; 00289 vd->SetPlatform(vp[platform_id]); 00290 pl = &vd->GetPlatform(); 00291 00292 fq = new CFullMatrix<float>(); 00293 fq->Initialize(queryName.size(), ps); 00294 00295 //NEW 3/5/2013==================================== 00296 if(outputQueryCoexpression){ 00297 const vector<string> gNames = pp->GetGeneNames(); 00298 fq_RBP = new CFullMatrix<float>(); 00299 fq_RBP->Initialize(gNames.size(), ps); 00300 //fprintf(stderr, "%d %d X1\n", i, gNames.size()); 00301 //#pragma omp parallel for \ 00302 shared(pp, fq_RBP) private(k, j) firstprivate(gs) \ 00303 schedule(dynamic) 00304 /*for(k=0; k<gNames.size(); k++){ 00305 int g = pp->GetGene(gNames[k]); 00306 float *vv = pp->Get(g); 00307 for(j=0; j<gs; j++) 00308 fq_RBP->Set(g, j, vv[j]); 00309 float mean = 0; 00310 for(j=0; j<gs; j++) 00311 mean+=fq_RBP->Get(g, j); 00312 mean/=(float) (gs); 00313 float stdev = 0; 00314 for(j=0; j<gs; j++) 00315 stdev+=(fq_RBP->Get(g, j) - mean) * (fq_RBP->Get(g, j) - mean); 00316 stdev/=(float) (gs); 00317 if(stdev<=0){ 00318 absent.insert(gNames[k]); 00319 } 00320 stdev = sqrt(stdev); 00321 if(isnan(stdev) || isinf(stdev)){ 00322 fprintf(stderr, "Error:, standard deviation is zero\n"); 00323 } 00324 for(j=0; j<gs; j++){ 00325 float t1 = fq_RBP->Get(g, j); 00326 fq_RBP->Set(g, j, (t1 - mean) / stdev); 00327 00328 } 00329 }*/ 00330 //fprintf(stderr, "%d X2\n", i); 00331 } 00332 //==================================================== 00333 00334 00335 if(outputQueryExpression){ 00336 for(k=0; k<queryName.size(); k++){ 00337 int g = pp->GetGene(queryName[k]); 00338 if(g==-1){ //does not contain the gene in the dataset 00339 for(j=0; j<gs; j++){ 00340 fq->Set(k, j, NaN); 00341 d_vecQ[i].push_back(fq->Get(k, j)); 00342 } 00343 continue; 00344 } 00345 float *vv = pp->Get(g); 00346 for(j=0; j<gs; j++) 00347 fq->Set(k, j, vv[j]); 00348 if(!outputNormalized){ 00349 for(j=0; j<gs; j++) 00350 d_vecQ[i].push_back(fq->Get(k, j)); 00351 } 00352 00353 //normalize 00354 float mean = 0; 00355 for(j=0; j<gs; j++) 00356 mean+=fq->Get(k, j); 00357 mean/=(float) (gs); 00358 float stdev = 0; 00359 for(j=0; j<gs; j++) 00360 stdev+=(fq->Get(k, j) - mean) * (fq->Get(k, j) - mean); 00361 stdev/=(float) (gs); 00362 stdev = sqrt(stdev); 00363 for(j=0; j<gs; j++){ 00364 float t1 = fq->Get(k, j); 00365 fq->Set(k, j, (t1 - mean) / stdev); 00366 } 00367 00368 if(outputNormalized){ 00369 for(j=0; j<gs; j++) 00370 d_vecQ[i].push_back(fq->Get(k, j)); 00371 } 00372 } 00373 } 00374 } 00375 00376 fprintf(stderr, "allocating space %d %d...\n", geneName.size(), 00377 ps); 00378 ff = new CFullMatrix<float>(); 00379 ff->Initialize(genes, ps); 00380 fprintf(stderr, "done allocating space.\n"); 00381 00382 if(outputExpression){ 00383 for(k=0; k<geneName.size(); k++){ 00384 int g = pp->GetGene(geneName[k]); 00385 if(g==-1){ 00386 for(j=0; j<gs; j++){ 00387 ff->Set(k, j, NaN); 00388 d_vecG[i].push_back(ff->Get(k, j)); 00389 } 00390 continue; 00391 } 00392 float *vv = pp->Get(g); 00393 for(j=0; j<gs; j++) 00394 ff->Set(k, j, vv[j]); 00395 00396 if(!outputNormalized){ 00397 for(j=0; j<gs; j++){ 00398 d_vecG[i].push_back(ff->Get(k, j)); 00399 } 00400 } 00401 00402 //normalize 00403 float mean = 0; 00404 for(j=0; j<gs; j++) 00405 mean+=ff->Get(k, j); 00406 mean/=(float) (gs); 00407 float stdev = 0; 00408 for(j=0; j<gs; j++) 00409 stdev+=(ff->Get(k, j) - mean) * (ff->Get(k, j) - mean); 00410 stdev/=(float) (gs); 00411 stdev = sqrt(stdev); 00412 for(j=0; j<gs; j++){ 00413 float t1 = ff->Get(k, j); 00414 ff->Set(k, j, (t1 - mean) / stdev); 00415 } 00416 00417 if(outputNormalized){ 00418 for(j=0; j<gs; j++){ 00419 d_vecG[i].push_back(ff->Get(k, j)); 00420 } 00421 } 00422 } 00423 } 00424 00425 if(outputCoexpression){ 00426 int kk; 00427 CMeasurePearNorm pn; 00428 for(k=0; k<geneName.size(); k++){ 00429 int g = pp->GetGene(geneName[k]); 00430 if(g==-1){ 00431 vCoexpression[k] = NaN; 00432 d_vecCoexpression[i].push_back(vCoexpression[k]); 00433 continue; 00434 } 00435 float avgP = 0; 00436 int totalQueryPresent = queryName.size(); 00437 for(kk=0; kk<queryName.size(); kk++){ 00438 int gg = pp->GetGene(queryName[kk]); 00439 if(gg==-1){ 00440 totalQueryPresent--; 00441 continue; 00442 } 00443 00444 float *x1 = NULL; 00445 float *x2 = NULL; 00446 if(g<gg){ 00447 x1 = pp->Get(g); 00448 x2 = pp->Get(gg); 00449 }else{ 00450 x1 = pp->Get(gg); 00451 x2 = pp->Get(g); 00452 } 00453 00454 float p = (float) pn.Measure(x1, ps, x2, ps, 00455 IMeasure::EMapCenter, NULL, NULL); 00456 00457 p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev(); 00458 00459 int gID = mapstrintGene[geneName[k]]; 00460 int qID = mapstrintGene[queryName[kk]]; 00461 00462 int qb = CMeta::Quantize(p, quant); 00463 p = quant[qb]; 00464 00465 //fprintf(stderr, "Correlation %d %d %.5f %.5f %.5f %.5f\n", qID, gID, p, vd->GetGeneAverage(gID), 00466 // pl->GetPlatformAvg(qID), pl->GetPlatformStdev(qID)); 00467 00468 //subtract hubbiness 00469 p = (p - vd->GetGeneAverage(gID) - pl->GetPlatformAvg(qID)) / pl->GetPlatformStdev(qID) ; 00470 //p = p - vd->GetGeneAverage(gID); 00471 p = max((float) min(p, (float) 3.2), (float)-3.2); 00472 00473 avgP+=p; 00474 } 00475 if(totalQueryPresent==0) 00476 avgP = NaN; 00477 else 00478 avgP/=(float)(totalQueryPresent); 00479 00480 vCoexpression[k] = avgP; 00481 d_vecCoexpression[i].push_back(avgP); 00482 } 00483 } 00484 00485 if(outputQueryCoexpression){ 00486 int kk=0; 00487 00488 const vector<string> gNames = pp->GetGeneNames(); 00489 vector<char> qMap; 00490 CSeekTools::InitVector(qMap, vecstrGenes.size(), (char)0); 00491 00492 int totQuery = 0; 00493 for(k=0; k<queryName.size(); k++){ 00494 int g = pp->GetGene(queryName[k]); 00495 if(g==-1) continue; 00496 int mg = mapstrintGene[queryName[k]]; 00497 qMap[mg] = 1; 00498 totQuery++; 00499 } 00500 00501 CMeasurePearNorm pn; 00502 00503 for(k=0; k<queryName.size(); k++){ 00504 int g = pp->GetGene(queryName[k]); 00505 if(g==-1){ 00506 vqCoexpression[k] = NaN; 00507 d_vecqCoexpression[i].push_back(vqCoexpression[k]); 00508 continue; 00509 } 00510 vector<AResult> ar; 00511 ar.resize(gNames.size()); 00512 00513 for(kk=0; kk<gNames.size(); kk++){ 00514 int gg = pp->GetGene(gNames[kk]); 00515 ar[kk].i = (utype) mapstrintGene[gNames[kk]]; 00516 if(g==gg){ 00517 ar[kk].f = 0; 00518 continue; 00519 } 00520 00521 float *x1 = NULL; 00522 float *x2 = NULL; 00523 if(g<gg){ 00524 x1 = pp->Get(g); 00525 x2 = pp->Get(gg); 00526 }else{ 00527 x1 = pp->Get(gg); 00528 x2 = pp->Get(g); 00529 } 00530 00531 float p = (float) pn.Measure(x1, ps, x2, ps, 00532 IMeasure::EMapCenter, NULL, NULL); 00533 00534 float px = p; 00535 if(!(p<5.0 && p>-5.0)){ 00536 ar[kk].f = 0; 00537 continue; 00538 } 00539 00540 //get z-score (dataset wide) 00541 p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev(); 00542 int gID = mapstrintGene[gNames[kk]]; 00543 int qID = mapstrintGene[queryName[k]]; 00544 00545 int qb = CMeta::Quantize(p, quant); 00546 p = quant[qb]; 00547 00548 //fprintf(stderr, "Correlation %d %d %.5f %.5f %.5f %.5f\n", qID, gID, p, vd->GetGeneAverage(gID), 00549 // pl->GetPlatformAvg(qID), pl->GetPlatformStdev(qID)); 00550 00551 //subtract hubbiness 00552 p = (p - vd->GetGeneAverage(gID) - pl->GetPlatformAvg(qID)) / pl->GetPlatformStdev(qID) ; 00553 //p = p - vd->GetGeneAverage(gID); 00554 p = max((float) min(p, (float) 3.2), (float)-3.2); 00555 00556 //fprintf(stderr, "%d error, infinite or nan! %.3f, %.3f, %.3f\n", i, 00557 // vd->GetDatasetAverage(), vd->GetDatasetStdev(), 00558 // vd->GetGeneAverage(mapstrintGene[gNames[kk]])); 00559 //fprintf(stderr, "Correlation %d %d %.5f\n", qID, gID, p); 00560 ar[kk].f = (utype) (p*100.0 + 320); 00561 00562 } 00563 00564 int TOP = 1000; 00565 if(ar.size()<TOP){ 00566 TOP = ar.size(); 00567 } 00568 //fprintf(stderr, "%d H2\n", i); 00569 nth_element(ar.begin(), ar.begin()+TOP, ar.end()); 00570 sort(ar.begin(), ar.begin()+TOP); 00571 //fprintf(stderr, "%d H3\n", i); 00572 00573 float rbp = 0; 00574 //utype jj = 0; 00575 for(kk=0; kk<TOP; kk++){ 00576 if(qMap[ar[kk].i]==0) continue; 00577 if(ar[kk].f==0) break; 00578 rbp+=pow(RBP_P, kk); 00579 fprintf(stderr, "Sorted %d %d %d %.5f\n", i, kk, ar[kk].i, (ar[kk].f-320)/100.0f); 00580 //jj++; 00581 //fprintf(stderr, "Good %d %d\n", i, kk); 00582 //jj++; 00583 } 00584 rbp *= (1.0 - RBP_P); 00585 00586 rbp = rbp / totQuery * 1000; 00587 fprintf(stderr, "%d %.3e\n", i, rbp); 00588 vqCoexpression[k] = rbp; 00589 d_vecqCoexpression[i].push_back(rbp); 00590 } 00591 00592 00593 /*for(k=0; k<queryName.size(); k++){ 00594 int g = pp->GetGene(queryName[k]); 00595 if(g==-1){ 00596 vqCoexpression[k] = NaN; 00597 vecqCoexpression.push_back(vqCoexpression[k]); 00598 continue; 00599 } 00600 float avgP = 0; 00601 int totalQueryPresent = queryName.size() - 1; 00602 00603 for(kk=0; kk<queryName.size(); kk++){ 00604 if(kk==k) continue; 00605 int gg = pp->GetGene(queryName[kk]); 00606 if(gg==-1){ 00607 totalQueryPresent--; 00608 continue; 00609 } 00610 00611 float *x1 = NULL; 00612 float *x2 = NULL; 00613 if(g<gg){ 00614 x1 = pp->Get(g); 00615 x2 = pp->Get(gg); 00616 }else{ 00617 x1 = pp->Get(gg); 00618 x2 = pp->Get(g); 00619 } 00620 00621 float p = (float) pn.Measure(x1, ps, x2, ps, 00622 IMeasure::EMapCenter, NULL, NULL); 00623 float px = p; 00624 00625 if(!(p<5.0 && p>-5.0)){ 00626 continue; 00627 } 00628 00629 //get z-score (dataset wide) 00630 p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev(); 00631 int gID = mapstrintGene[queryName[kk]]; 00632 int qID = mapstrintGene[queryName[k]]; 00633 00634 int qb = CMeta::Quantize(p, quant); 00635 p = quant[qb]; 00636 00637 //fprintf(stderr, "Correlation %d %d %.5f %.5f %.5f %.5f\n", qID, gID, p, vd->GetGeneAverage(gID), 00638 // pl->GetPlatformAvg(qID), pl->GetPlatformStdev(qID)); 00639 00640 //subtract hubbiness 00641 p = (p - vd->GetGeneAverage(gID) - pl->GetPlatformAvg(qID)) / pl->GetPlatformStdev(qID) ; 00642 //p = p - vd->GetGeneAverage(gID); 00643 p = max((float) min(p, (float) 3.2), (float)-3.2); 00644 00645 //float p = 0; 00646 //for(j=2; j<gs; j++) 00647 // p+= fq->Get(k, j-2)*fq->Get(kk, j-2); 00648 //p/=(float)(gs-2); 00649 //p = 0.5 * log((1.0+p)/(1.0-p)); 00650 //p = max((float) min(p, (float) 3.2), (float)-3.2); 00651 //get z-score (dataset wide) 00652 //p = (p - vd->GetDatasetAverage()) / vd->GetDatasetStdev(); 00653 //subtract hubbiness 00654 //p = p - vd->GetGeneAverage(mapstrintGene[queryName[kk]]); 00655 00656 00657 avgP+=p; 00658 } 00659 if(totalQueryPresent==0) 00660 avgP = NaN; 00661 else 00662 avgP/=(float)(totalQueryPresent); 00663 00664 vqCoexpression[k] = avgP; 00665 d_vecqCoexpression[i].push_back(avgP); 00666 }*/ 00667 00668 00669 00670 } 00671 00672 if(outputCoexpression || outputQueryCoexpression){ 00673 delete vd; 00674 delete fq; 00675 if(outputQueryCoexpression) 00676 delete fq_RBP; 00677 } 00678 00679 delete ff; 00680 } 00681 00682 for(i=0; i<vc.size(); i++){ 00683 vecG.insert(vecG.end(), d_vecG[i].begin(), d_vecG[i].end()); 00684 vecQ.insert(vecQ.end(), d_vecQ[i].begin(), d_vecQ[i].end()); 00685 vecCoexpression.insert(vecCoexpression.end(), 00686 d_vecCoexpression[i].begin(), d_vecCoexpression[i].end()); 00687 vecqCoexpression.insert(vecqCoexpression.end(), 00688 d_vecqCoexpression[i].begin(), d_vecqCoexpression[i].end()); 00689 } 00690 00691 00692 if(CSeekNetwork::Send(new_fd, sizeD)!=0){ 00693 fprintf(stderr, "Error sending messages\n"); 00694 } 00695 00696 if(outputExpression){ 00697 if(CSeekNetwork::Send(new_fd, vecG)!=0){ 00698 fprintf(stderr, "Error sending messages\n"); 00699 } 00700 } 00701 00702 if(outputQueryExpression){ 00703 if(CSeekNetwork::Send(new_fd, vecQ)!=0){ 00704 fprintf(stderr, "Error sending messages\n"); 00705 } 00706 } 00707 00708 if(outputCoexpression){ 00709 if(CSeekNetwork::Send(new_fd, vecCoexpression)!=0){ 00710 fprintf(stderr, "Error sending messages\n"); 00711 } 00712 } 00713 00714 if(outputQueryCoexpression){ 00715 if(CSeekNetwork::Send(new_fd, vecqCoexpression)!=0){ 00716 fprintf(stderr, "Error sending messages\n"); 00717 } 00718 } 00719 00720 for(i=0; i<NUM_DSET_MEMORY; i++){ 00721 loaded[i] = 0; 00722 } 00723 00724 THREAD_OCCUPIED[tid] = 0; 00725 00726 pthread_mutex_unlock(&mutexGet); 00727 00728 int ret = 0; 00729 close(new_fd); 00730 pthread_exit((void*)ret); 00731 //return void; 00732 } 00733 00734 int main( int iArgs, char** aszArgs ) { 00735 static const size_t c_iBuffer = 1024; 00736 #ifdef WIN32 00737 pthread_win32_process_attach_np( ); 00738 #endif // WIN32 00739 00740 gengetopt_args_info sArgs; 00741 00742 if( cmdline_parser( iArgs, aszArgs, &sArgs ) ) { 00743 cmdline_parser_print_help( ); 00744 return 1; 00745 } 00746 00747 signal(SIGPIPE, SIG_IGN); 00748 size_t i, j; 00749 for(i=0; i<NUM_THREADS; i++){ 00750 THREAD_OCCUPIED[i] = 0; 00751 } 00752 00753 PORT = sArgs.port_arg; 00754 00755 CSeekDBSetting *dbSetting = new CSeekDBSetting( 00756 "NA", //default gvar arg, argument not needed for PCLServer 00757 sArgs.sinfo_arg, sArgs.platform_arg, sArgs.prep_arg, 00758 ".", //default DB arg, argument not needed for PCLServer 00759 sArgs.gene_arg, sArgs.quant_arg, sArgs.dset_arg, 00760 21702 //default num_db arg, argument not needed for PCLServer 00761 ); 00762 00763 //vector<CSeekDBSetting*> cc; 00764 cc.push_back(dbSetting); 00765 00766 pcl_input_dir = sArgs.input_arg; 00767 00768 string add_db = sArgs.additional_db_arg; 00769 if(add_db!="NA"){ 00770 ifstream ifsm; 00771 ifsm.open(add_db.c_str()); 00772 const int lineSize =1024; 00773 if(!ifsm.is_open()){ 00774 fprintf(stderr, "Error opening file %s\n", add_db.c_str()); 00775 return false; 00776 } 00777 char acBuffer[lineSize]; 00778 utype c_iBuffer = lineSize; 00779 vector<map<string,string> > parameters; //an array of CDatabase's 00780 while(!ifsm.eof()){ 00781 ifsm.getline(acBuffer, c_iBuffer-1); 00782 if(acBuffer[0]==0) break; 00783 acBuffer[c_iBuffer-1]=0; 00784 string strB = acBuffer; 00785 if(strB=="START"){ 00786 map<string,string> p; 00787 while(!ifsm.eof()){ 00788 ifsm.getline(acBuffer, c_iBuffer-1); 00789 if(acBuffer[0]==0){ 00790 fprintf(stderr, "Invalid line (empty)\n"); 00791 return 1; 00792 } 00793 strB = acBuffer; 00794 if(strB=="END") break; 00795 vector<string> tok; 00796 CMeta::Tokenize(acBuffer, tok); //separator is tab 00797 p[tok[0]] = tok[1]; 00798 } 00799 parameters.push_back(p); 00800 } 00801 } 00802 ifsm.close(); 00803 if(parameters.size()==0){ 00804 fprintf(stderr, "Error, extra_db setting file must begin with START and end with END lines\n"); 00805 return 1; 00806 } 00807 00808 for(i=0; i<parameters.size(); i++){ 00809 string sinfo_dir = "NA"; 00810 string gvar_dir = "NA"; 00811 string platform_dir = "NA"; 00812 string prep_dir = "NA"; 00813 string db_dir = "NA"; 00814 string dset_map_file = "NA"; 00815 string gene_map_file = "NA"; 00816 string quant_file = "NA"; 00817 int num_db = -1; 00818 00819 if(parameters[i].find("SINFO_DIR")->second=="NA"){ 00820 fprintf(stderr, "Please specify an sinfo directory for the extra db\n"); 00821 return false; 00822 } 00823 sinfo_dir = parameters[i].find("SINFO_DIR")->second; 00824 if(parameters[i].find("GVAR_DIR")!=parameters[i].end()) 00825 gvar_dir = parameters[i].find("GVAR_DIR")->second; 00826 if(parameters[i].find("PREP_DIR")==parameters[i].end() || 00827 parameters[i].find("PLATFORM_DIR")==parameters[i].end() || 00828 parameters[i].find("DB_DIR")==parameters[i].end() || 00829 parameters[i].find("DSET_MAP_FILE")==parameters[i].end() || 00830 parameters[i].find("GENE_MAP_FILE")==parameters[i].end() || 00831 parameters[i].find("QUANT_FILE")==parameters[i].end() || 00832 parameters[i].find("NUMBER_OF_DB")==parameters[i].end()){ 00833 fprintf(stderr, "Some arguments are missing. Please make sure the following are provided:\n"); 00834 fprintf(stderr, "PREP_DIR, DB_DIR, DSET_MAP_FILE, GENE_MAP_FILE, QUANT_FILE, NUMBER_OF_DB\n"); 00835 return false; 00836 } 00837 00838 platform_dir = parameters[i].find("PLATFORM_DIR")->second; 00839 db_dir = parameters[i].find("DB_DIR")->second; 00840 prep_dir = parameters[i].find("PREP_DIR")->second; 00841 dset_map_file = parameters[i].find("DSET_MAP_FILE")->second; 00842 gene_map_file = parameters[i].find("GENE_MAP_FILE")->second; 00843 quant_file = parameters[i].find("QUANT_FILE")->second; 00844 num_db = atoi(parameters[i].find("NUMBER_OF_DB")->second.c_str()); 00845 00846 CSeekDBSetting *dbSetting2 = new CSeekDBSetting(gvar_dir, sinfo_dir, 00847 platform_dir, prep_dir, db_dir, gene_map_file, quant_file, dset_map_file, 00848 num_db); 00849 cc.push_back(dbSetting2); 00850 } 00851 } 00852 00853 if(!CSeekTools::ReadListTwoColumns(sArgs.gene_arg, vecstrGeneID, vecstrGenes)) 00854 return false; 00855 for(i=0; i<vecstrGenes.size(); i++) 00856 mapstrintGene[vecstrGenes[i]] = (int) i; 00857 00858 for(i=0; i<cc.size(); i++){ 00859 vector<string> vD, vDP; 00860 if(!CSeekTools::ReadListTwoColumns(cc[i]->GetValue("dset"), vD, vDP)) 00861 return false; 00862 for(j=0; j<vD.size(); j++){ 00863 vecstrDatasets.push_back(vD[j]); 00864 vecstrDP.push_back(vDP[j]); 00865 mapstrintDatasetDB[vD[j]] = (int) i; 00866 } 00867 vector<string> vP; 00868 map<string,utype> mP; 00869 vector<CSeekPlatform> vpx; 00870 CSeekTools::ReadPlatforms(cc[i]->GetValue("platform"), vpx, vP, mP); 00871 int cur=vp.size(); 00872 for(map<string,utype>::iterator it=mP.begin(); 00873 it!=mP.end(); it++){ 00874 mapstriPlatform[it->first] = it->second+cur; 00875 } 00876 vp.resize(cur+vpx.size()); 00877 for(j=0; j<vpx.size(); j++) 00878 vp[cur+j].Copy(vpx[j]); 00879 } 00880 00881 for(i=0; i<vecstrDatasets.size(); i++){ 00882 mapstrstrDatasetPlatform[vecstrDatasets[i]] = vecstrDP[i]; 00883 mapstrintDataset[vecstrDatasets[i]] = i; 00884 } 00885 00886 //================================================== 00887 00888 00889 int sockfd, new_fd; 00890 struct addrinfo hints, *servinfo, *p; 00891 struct sockaddr_storage their_addr; 00892 socklen_t sin_size; 00893 struct sigaction sa; 00894 char s[INET6_ADDRSTRLEN]; 00895 char buf[10]; 00896 int rv; 00897 int yes = 1; 00898 00899 memset(&hints, 0, sizeof(hints)); 00900 hints.ai_family=AF_UNSPEC; 00901 hints.ai_socktype = SOCK_STREAM; 00902 hints.ai_flags = AI_PASSIVE; 00903 00904 if((rv=getaddrinfo(NULL, PORT, &hints, &servinfo))!=0){ 00905 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); 00906 return 1; 00907 } 00908 00909 // loop through all the results and bind to the first we can 00910 for(p = servinfo; p != NULL; p = p->ai_next) { 00911 if ((sockfd = socket(p->ai_family, p->ai_socktype, 00912 p->ai_protocol)) == -1) { 00913 perror("server: socket"); 00914 continue; 00915 } 00916 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, 00917 sizeof(int)) == -1) { 00918 perror("setsockopt"); 00919 exit(1); 00920 } 00921 if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { 00922 close(sockfd); 00923 perror("server: bind"); 00924 continue; 00925 } 00926 break; 00927 } 00928 00929 if (p == NULL) { 00930 fprintf(stderr, "server: failed to bind\n"); 00931 return 2; 00932 } 00933 00934 freeaddrinfo(servinfo); // all done with this structure 00935 00936 if (listen(sockfd, BACKLOG) == -1) { 00937 perror("listen"); 00938 exit(1); 00939 } 00940 00941 sa.sa_handler = sigchld_handler; // reap all dead processes 00942 sigemptyset(&sa.sa_mask); 00943 sa.sa_flags = SA_RESTART; 00944 if (sigaction(SIGCHLD, &sa, NULL) == -1) { 00945 perror("sigaction"); 00946 exit(1); 00947 } 00948 00949 printf("server: waiting for connections...\n"); 00950 struct thread_data thread_arg[NUM_THREADS]; 00951 pthread_t th[NUM_THREADS]; 00952 int d = 0; 00953 00954 pthread_mutex_init(&mutexGet, NULL); 00955 00956 fprintf(stderr, "Start init.\n"); 00957 00958 pcl = new CPCL*[NUM_DSET_MEMORY]; 00959 00960 loaded = new char[NUM_DSET_MEMORY]; 00961 00962 available.clear(); 00963 for(i=0; i<NUM_DSET_MEMORY; i++){ 00964 available.push_back(i); 00965 loaded[i] = 0; 00966 pcl[i] = new CPCL(); 00967 } 00968 00969 fprintf(stderr, "Finished initializations.\n"); 00970 while(1){ 00971 sin_size = sizeof their_addr; 00972 new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size); 00973 if(new_fd==-1){ 00974 perror("accept"); 00975 continue; 00976 } 00977 inet_ntop(their_addr.ss_family, get_in_addr( 00978 (struct sockaddr *)&their_addr), s, sizeof s); 00979 printf("server, got connection from %s\n", s); 00980 for(d=0; d<NUM_THREADS; d++){ 00981 if(THREAD_OCCUPIED[d]==0) break; 00982 } 00983 if(d==NUM_THREADS){ 00984 close(new_fd); 00985 continue; 00986 } 00987 00988 THREAD_OCCUPIED[d] = 1; 00989 string mode; 00990 00991 if(CSeekNetwork::Receive(new_fd, mode)!=0){ 00992 fprintf(stderr, "Error: receiving message\n"); 00993 close(new_fd); 00994 continue; 00995 } 00996 00997 //mode, 5 digits 00998 //1 - output coexpression calculation (with gene hubbiness removed), 00999 //require setting two groups of genes, one for query (1 or many), 01000 //and one other gene (to calculate coexpression on) 01001 //2 - output gene-normalized expression instead of original expression 01002 //3 - output gene expression (can be normalized or unnormalized, depends on 2) 01003 //4 - output query expression 01004 //5 - output query coexpression (compared to 1, which is gene-to-query 01005 // coexpression. This is query-to-query coexpression) 01006 01007 bool outputCoexpression = false; 01008 bool outputNormalized = false; 01009 bool outputExpression = false; 01010 bool outputQueryExpression = false; 01011 bool outputQueryCoexpression = false; 01012 if(mode[0]=='1') 01013 outputCoexpression = true; 01014 if(mode[1]=='1') 01015 outputNormalized = true; 01016 if(mode[2]=='1') 01017 outputExpression = true; 01018 if(mode[3]=='1') 01019 outputQueryExpression = true; 01020 if(mode[4]=='1') 01021 outputQueryCoexpression = true; 01022 01023 vector<string> dsetName, geneName, queryName; 01024 string qname, gname, dname; 01025 string strrbp; //-1: disabled, 0-0.9999 01026 float rbp_p = -1; 01027 01028 if(outputCoexpression || outputQueryCoexpression){ 01029 if( CSeekNetwork::Receive(new_fd, qname)!=0 || 01030 CSeekNetwork::Receive(new_fd, gname)!=0 || 01031 CSeekNetwork::Receive(new_fd, dname)!=0 || 01032 CSeekNetwork::Receive(new_fd, strrbp)!=0){ 01033 fprintf(stderr, "Error: receiving message\n"); 01034 close(new_fd); 01035 continue; 01036 } 01037 CMeta::Tokenize(dname.c_str(), dsetName); 01038 CMeta::Tokenize(gname.c_str(), geneName); 01039 CMeta::Tokenize(qname.c_str(), queryName); 01040 rbp_p = atof(strrbp.c_str()); 01041 } 01042 else{ 01043 if( CSeekNetwork::Receive(new_fd, gname)!=0 || 01044 CSeekNetwork::Receive(new_fd, dname)!=0 ){ 01045 fprintf(stderr, "Error: receiving message\n"); 01046 close(new_fd); 01047 continue; 01048 } 01049 CMeta::Tokenize(dname.c_str(), dsetName); 01050 CMeta::Tokenize(gname.c_str(), geneName); 01051 } 01052 01053 thread_arg[d].threadid = d; 01054 thread_arg[d].new_fd = new_fd; 01055 thread_arg[d].geneName = geneName; 01056 thread_arg[d].datasetName = dsetName; 01057 thread_arg[d].queryName = queryName; 01058 thread_arg[d].outputNormalized = outputNormalized; 01059 thread_arg[d].outputCoexpression = outputCoexpression; 01060 thread_arg[d].outputExpression = outputExpression; 01061 thread_arg[d].outputQueryExpression = outputQueryExpression; 01062 thread_arg[d].outputQueryCoexpression = outputQueryCoexpression; 01063 thread_arg[d].rbp_p = rbp_p; 01064 01065 fprintf(stderr, "Arguments: %d %d %s %s\n", d, new_fd, dname.c_str(), gname.c_str()); 01066 if(outputCoexpression){ 01067 fprintf(stderr, "Arguments: %s\n", qname.c_str()); 01068 } 01069 01070 int ret; 01071 pthread_create(&th[d], NULL, do_query, (void *) &thread_arg[d]); 01072 /*pthread_join(th[d], (void **)&ret); 01073 if(ret==0){ 01074 close(new_fd); 01075 }*/ 01076 } 01077 01078 01079 #ifdef WIN32 01080 pthread_win32_process_detach_np( ); 01081 #endif // WIN32 01082 return 0; }