Skip to content

Commit

Permalink
Add multi-thread support to pairaln
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-steinegger committed Feb 26, 2022
1 parent ce7bf53 commit 3f8695e
Showing 1 changed file with 76 additions and 54 deletions.
130 changes: 76 additions & 54 deletions src/util/pairaln.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,82 +25,104 @@ int pairaln(int argc, const char **argv, const Command& command) {
Parameters &par = Parameters::getInstance();
par.parseParameters(argc, argv, command, true, 0, 0);

DBReader<unsigned int> qdbr(par.db1.c_str(), par.db1Index.c_str(), par.threads, DBReader<unsigned int>::USE_LOOKUP_REV);
qdbr.open(DBReader<unsigned int>::NOSORT);
DBReader<unsigned int>::LookupEntry* lookup = qdbr.getLookup();
unsigned int maxFileNumber = 0;
for (unsigned int i = 0; i < qdbr.getLookupSize(); i++) {
maxFileNumber = std::max(maxFileNumber, lookup[i].fileNumber);
}
//build a mapping from file number to ids from lookup
std::vector<std::vector<unsigned int>> fileToIds(maxFileNumber + 1, std::vector<unsigned int>());
for (size_t i = 0; i < qdbr.getLookupSize(); ++i) {
fileToIds[lookup[i].fileNumber].push_back(lookup[i].id);
}

std::string db2NoIndexName = PrefilteringIndexReader::dbPathWithoutIndex(par.db2);
MappingReader* mapping = new MappingReader(db2NoIndexName);

DBReader<unsigned int> alnDbr(par.db3.c_str(), par.db3Index.c_str(), par.threads, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
alnDbr.open(DBReader<unsigned int>::NOSORT);

DBWriter resultWriter(par.db4.c_str(), par.db4Index.c_str(), 1, par.compressed, alnDbr.getDbtype());
DBWriter resultWriter(par.db4.c_str(), par.db4Index.c_str(), par.threads, par.compressed, alnDbr.getDbtype());
resultWriter.open();

Debug::Progress progress(alnDbr.getSize());
do {
int thread_idx = 0;
char buffer[1024 + 32768*4];
Debug::Progress progress(fileToIds.size());
#pragma omp parallel
{
unsigned int thread_idx = 0;
#ifdef OPENMP
thread_idx = (unsigned int) omp_get_thread_num();
#endif
std::vector<Matcher::result_t> result;
result.reserve(100000);
std::unordered_map<unsigned int, size_t> findPair;
std::string output;
output.reserve(100000);
for (size_t fileNumber = 0; fileNumber < fileToIds.size(); fileNumber++) {
char buffer[1024 + 32768 * 4];
findPair.clear();

// find intersection between all proteins
for (size_t i = 0; i < alnDbr.getSize(); i++) {
result.clear();
Matcher::readAlignmentResults(result, alnDbr.getData(i, thread_idx), true);
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
// we don't want to introduce a new field, reuse existing unused field here
result[resIdx].dbOrfStartPos = taxon;
}
std::stable_sort(result.begin(), result.end(), compareByTaxId);
unsigned int prevTaxon = UINT_MAX;
// find pairs
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
if (taxon == prevTaxon){
continue;
// find intersection between all proteins
for (size_t i = 0; i < fileToIds[fileNumber].size(); i++) {
result.clear();
size_t id = fileToIds[fileNumber][i];
Matcher::readAlignmentResults(result, alnDbr.getData(id, thread_idx), true);
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
// we don't want to introduce a new field, reuse existing unused field here
result[resIdx].dbOrfStartPos = taxon;
}
if (findPair.find(taxon) != findPair.end()) {
findPair[taxon]++;
}else{
findPair.emplace(taxon, 1);
std::stable_sort(result.begin(), result.end(), compareByTaxId);
unsigned int prevTaxon = UINT_MAX;
// find pairs
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
if (taxon == prevTaxon) {
continue;
}
if (findPair.find(taxon) != findPair.end()) {
findPair[taxon]++;
} else {
findPair.emplace(taxon, 1);
}
prevTaxon = taxon;
}
prevTaxon = taxon;
}
}

for (size_t i = 0; i < alnDbr.getSize(); i++) {
progress.updateProgress();
result.clear();
output.clear();
Matcher::readAlignmentResults(result, alnDbr.getData(i, thread_idx), true);
// find pairs
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
// we don't want to introduce a new field, reuse existing unused field here
result[resIdx].dbOrfStartPos = taxon;
}
// stable sort is required to assure that best hit is first per taxon
std::stable_sort(result.begin(), result.end(), compareByTaxId);
unsigned int prevTaxon = UINT_MAX;
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = result[resIdx].dbOrfStartPos;
// found pair!
if (taxon != prevTaxon
&& findPair.find(taxon) != findPair.end()
&& findPair[taxon] == alnDbr.getSize()) {
bool hasBacktrace = result[resIdx].backtrace.size() > 0;
size_t len = Matcher::resultToBuffer(buffer, result[resIdx], hasBacktrace, false, false);
output.append(buffer, len);
for (size_t i = 0; i < fileToIds[fileNumber].size(); i++) {
progress.updateProgress();
result.clear();
output.clear();
size_t id = fileToIds[fileNumber][i];
Matcher::readAlignmentResults(result, alnDbr.getData(id, thread_idx), true);
// find pairs
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
// we don't want to introduce a new field, reuse existing unused field here
result[resIdx].dbOrfStartPos = taxon;
}
prevTaxon = taxon;
// stable sort is required to assure that best hit is first per taxon
std::stable_sort(result.begin(), result.end(), compareByTaxId);
unsigned int prevTaxon = UINT_MAX;
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = result[resIdx].dbOrfStartPos;
// found pair!
if (taxon != prevTaxon
&& findPair.find(taxon) != findPair.end()
&& findPair[taxon] == fileToIds[fileNumber].size()) {
bool hasBacktrace = result[resIdx].backtrace.size() > 0;
size_t len = Matcher::resultToBuffer(buffer, result[resIdx], hasBacktrace, false, false);
output.append(buffer, len);
}
prevTaxon = taxon;
}
resultWriter.writeData(output.c_str(), output.length(), alnDbr.getDbKey(id), thread_idx);
}
resultWriter.writeData(output.c_str(), output.length(), alnDbr.getDbKey(i), thread_idx);
}
} while(false);

}
resultWriter.close();
qdbr.close();
// clean up
delete mapping;
alnDbr.close();
Expand Down

0 comments on commit 3f8695e

Please sign in to comment.