From 6cc82cb68c81497bf8ef9d19c085122cc6be5861 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Thu, 21 Nov 2024 15:01:18 -0600 Subject: [PATCH 01/20] feat: Add flag to disable sequence grouping with -G option --- src/interface/parse_args.hpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/interface/parse_args.hpp b/src/interface/parse_args.hpp index 9ee467e9..de43e730 100644 --- a/src/interface/parse_args.hpp +++ b/src/interface/parse_args.hpp @@ -105,6 +105,7 @@ void parse_args(int argc, args::Flag one_to_one(mapping_opts, "", "Perform one-to-one filtering", {'o', "one-to-one"}); args::Flag lower_triangular(mapping_opts, "", "Only compute the lower triangular for all-vs-all mapping", {'L', "lower-triangular"}); args::ValueFlag skip_prefix(mapping_opts, "C", "map between sequence groups with different prefix [#]", {'Y', "group-prefix"}); + args::Flag disable_grouping(mapping_opts, "", "disable sequence grouping (equivalent to -Y \\0)", {'G', "disable-grouping"}); args::ValueFlag target_prefix(mapping_opts, "pfx", "use only targets whose names start with this prefix", {'T', "target-prefix"}); args::ValueFlag target_list(mapping_opts, "FILE", "file containing list of target sequence names to use", {'R', "target-list"}); args::ValueFlag query_prefix(mapping_opts, "pfxs", "filter queries by comma-separated prefixes", {'Q', "query-prefix"}); @@ -173,7 +174,10 @@ void parse_args(int argc, map_parameters.lower_triangular = lower_triangular ? args::get(lower_triangular) : false; map_parameters.keep_low_pct_id = true; - if (skip_prefix) { + if (disable_grouping) { + map_parameters.prefix_delim = '\0'; + map_parameters.skip_prefix = false; + } else if (skip_prefix) { map_parameters.prefix_delim = args::get(skip_prefix); map_parameters.skip_prefix = map_parameters.prefix_delim != '\0'; } else { From 999d4588408fa57dbb1682be2ad56ad93be84263 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 11:12:24 -0600 Subject: [PATCH 02/20] feat: Implement subset filtering optimization for memory efficiency --- src/map/include/computeMap.hpp | 60 ++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index ba1d8137..65d99dc5 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -149,6 +149,9 @@ namespace skch typedef atomic_queue::AtomicQueue writer_atomic_queue_t; typedef atomic_queue::AtomicQueue query_output_atomic_queue_t; typedef atomic_queue::AtomicQueue fragment_atomic_queue_t; + + // Track maximum chain ID seen across all subsets + std::atomic maxChainIdSeen{0}; void processFragment(FragmentData* fragment, @@ -721,6 +724,12 @@ namespace skch aggregator.join(); + // Filter mappings within this subset before aggregation + for (auto& [querySeqId, mappings] : combinedMappings) { + filterSubsetMappings(mappings, progress); + updateChainIds(mappings); + } + // Reset flags and clear aggregatedMappings for next iteration reader_done.store(false); workers_done.store(false); @@ -2159,6 +2168,47 @@ namespace skch * @param begin Iterator to the start of the chain * @param end Iterator to the end of the chain */ + /** + * @brief Filter mappings within a subset before aggregation + * @param mappings Mappings to filter + * @param param Algorithm parameters + */ + void filterSubsetMappings(MappingResultsVector_t& mappings, progress_meter::ProgressMeter& progress) { + if (mappings.empty()) return; + + // Merge and filter chains within this subset + auto maximallyMergedMappings = mergeMappingsInRange(mappings, param.chain_gap, progress); + filterMaximallyMerged(maximallyMergedMappings, param, progress); + mappings = std::move(maximallyMergedMappings); + } + + /** + * @brief Update chain IDs to prevent conflicts between subsets + * @param mappings Mappings whose chain IDs need updating + * @param maxId Current maximum chain ID seen + */ + void updateChainIds(MappingResultsVector_t& mappings) { + if (mappings.empty()) return; + + // Get current offset + offset_t offset = maxChainIdSeen.load(std::memory_order_relaxed); + + // Update all chain IDs in this subset + for (auto& mapping : mappings) { + mapping.splitMappingId += offset; + } + + // Update maximum seen if needed + if (!mappings.empty()) { + offset_t current_max = maxChainIdSeen.load(std::memory_order_relaxed); + offset_t new_max = mappings.back().splitMappingId; + while (new_max > current_max && + !maxChainIdSeen.compare_exchange_weak(current_max, new_max, + std::memory_order_release, + std::memory_order_relaxed)); + } + } + void computeChainStatistics(std::vector::iterator begin, std::vector::iterator end) { offset_t chain_start_query = std::numeric_limits::max(); offset_t chain_end_query = std::numeric_limits::min(); @@ -2297,8 +2347,14 @@ namespace skch auto& mappings = *(task->second); std::string queryName = idManager->getSequenceName(querySeqId); - processAggregatedMappings(queryName, mappings, progress); - + // Final filtering pass on pre-filtered mappings + if (param.filterMode == filter::MAP || param.filterMode == filter::ONETOONE) { + MappingResultsVector_t filteredMappings; + filterByGroup(mappings, filteredMappings, param.numMappingsForSegment - 1, + param.filterMode == filter::ONETOONE, *idManager, progress); + mappings = std::move(filteredMappings); + } + std::stringstream ss; reportReadMappings(mappings, queryName, ss); From 2ec4192f37bfa9d7d8123d0c81c698edec589ce4 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 11:22:03 -0600 Subject: [PATCH 03/20] refactor: Improve atomic chain ID update with max subset calculation --- src/map/include/computeMap.hpp | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 65d99dc5..4fab0001 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -2190,22 +2190,23 @@ namespace skch void updateChainIds(MappingResultsVector_t& mappings) { if (mappings.empty()) return; - // Get current offset - offset_t offset = maxChainIdSeen.load(std::memory_order_relaxed); - - // Update all chain IDs in this subset - for (auto& mapping : mappings) { - mapping.splitMappingId += offset; + // Find maximum chain ID in this subset + offset_t max_subset_id = 0; + for (const auto& mapping : mappings) { + max_subset_id = std::max(max_subset_id, mapping.splitMappingId); } - // Update maximum seen if needed - if (!mappings.empty()) { - offset_t current_max = maxChainIdSeen.load(std::memory_order_relaxed); - offset_t new_max = mappings.back().splitMappingId; - while (new_max > current_max && - !maxChainIdSeen.compare_exchange_weak(current_max, new_max, - std::memory_order_release, - std::memory_order_relaxed)); + // Get current offset and try to update with our max + offset_t current_max = maxChainIdSeen.load(std::memory_order_relaxed); + while (!maxChainIdSeen.compare_exchange_weak(current_max, current_max + max_subset_id + 1, + std::memory_order_release, + std::memory_order_relaxed)) { + // If CAS failed, current_max has the latest value + } + + // Update all chain IDs in this subset with the new offset + for (auto& mapping : mappings) { + mapping.splitMappingId += current_max; } } From b4ee587e7238b943b76046b30ffdef30cc20d8ee Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 13:09:51 -0600 Subject: [PATCH 04/20] refactor: Improve mapping aggregation across target subsets --- src/map/include/computeMap.hpp | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 4fab0001..4c8e273e 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -684,6 +684,9 @@ namespace skch "[wfmash::mashmap] mapping (" + std::to_string(subset_count + 1) + "/" + std::to_string(total_subsets) + ")"); + // Create temporary storage for this subset's mappings + std::unordered_map subsetMappings; + // Launch reader thread std::thread reader([&]() { reader_thread(input_queue, reader_done, progress, *idManager); @@ -704,9 +707,9 @@ namespace skch }); } - // Launch aggregator thread + // Launch aggregator thread with subset storage std::thread aggregator([&]() { - aggregator_thread(merged_queue, workers_done, combinedMappings); + aggregator_thread(merged_queue, workers_done, subsetMappings); }); // Wait for all threads to complete @@ -724,13 +727,24 @@ namespace skch aggregator.join(); - // Filter mappings within this subset before aggregation - for (auto& [querySeqId, mappings] : combinedMappings) { + // Filter mappings within this subset before merging with previous results + for (auto& [querySeqId, mappings] : subsetMappings) { filterSubsetMappings(mappings, progress); updateChainIds(mappings); + + // Merge with existing mappings for this query + if (combinedMappings.find(querySeqId) == combinedMappings.end()) { + combinedMappings[querySeqId] = std::move(mappings); + } else { + combinedMappings[querySeqId].insert( + combinedMappings[querySeqId].end(), + std::make_move_iterator(mappings.begin()), + std::make_move_iterator(mappings.end()) + ); + } } - // Reset flags and clear aggregatedMappings for next iteration + // Reset flags for next iteration reader_done.store(false); workers_done.store(false); fragments_done.store(false); From b64fadbad6858770f19f8b49be7172d92b477921 Mon Sep 17 00:00:00 2001 From: Erik Garrison Date: Mon, 25 Nov 2024 13:28:11 -0600 Subject: [PATCH 05/20] check if the approx mapping flag is set to correctly detect if we are approx mapping (params are parsed later) --- src/interface/parse_args.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/interface/parse_args.hpp b/src/interface/parse_args.hpp index de43e730..213bdad1 100644 --- a/src/interface/parse_args.hpp +++ b/src/interface/parse_args.hpp @@ -307,7 +307,7 @@ void parse_args(int argc, exit(1); } - if (!yeet_parameters.approx_mapping && s > 10000) { + if (!approx_mapping && s > 10000) { std::cerr << "[wfmash] ERROR: segment length (-s) must be <= 10kb when running alignment." << std::endl << "[wfmash] For larger values, use -m/--approx-mapping to generate mappings," << std::endl << "[wfmash] then align them with: wfmash ... -i mappings.paf" << std::endl; @@ -336,7 +336,7 @@ void parse_args(int argc, exit(1); } - if (!yeet_parameters.approx_mapping && l > 30000) { + if (!approx_mapping && l > 30000) { std::cerr << "[wfmash] ERROR: block length (-l) must be <= 30kb when running alignment." << std::endl << "[wfmash] For larger values, use -m/--approx-mapping to generate mappings," << std::endl << "[wfmash] then align them with: wfmash ... -i mappings.paf" << std::endl; @@ -367,7 +367,7 @@ void parse_args(int argc, std::cerr << "[wfmash] ERROR: max mapping length must be greater than 0." << std::endl; exit(1); } - if (!yeet_parameters.approx_mapping && l > 100000) { + if (!approx_mapping && l > 100000) { std::cerr << "[wfmash] ERROR: max mapping length (-P) must be <= 100kb when running alignment." << std::endl << "[wfmash] For larger values, use -m/--approx-mapping to generate mappings," << std::endl << "[wfmash] then align them with: wfmash ... -i mappings.paf" << std::endl; From a735977c8806354027dca5fcf68291a198572810 Mon Sep 17 00:00:00 2001 From: Erik Garrison Date: Mon, 25 Nov 2024 16:27:49 -0600 Subject: [PATCH 06/20] refactor: Improve mapping filtering and chaining logic in parallel processing --- src/map/include/computeMap.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 4c8e273e..386aeb25 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -729,7 +729,9 @@ namespace skch // Filter mappings within this subset before merging with previous results for (auto& [querySeqId, mappings] : subsetMappings) { + // XXXXX this filtering should be done within each WORKER THREAD which runs once PER QUERY and is thus able to filterSubsetMappings(mappings, progress); + // XXXXX this should be done in the aggregator thread updateChainIds(mappings); // Merge with existing mappings for this query From 3fa77f9b0a640761ec6e15a3adc046dd4e1d099f Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:27:51 -0600 Subject: [PATCH 07/20] refactor: Move subset mapping filtering and chain ID updates to correct threads --- src/map/include/computeMap.hpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 386aeb25..a8ad7227 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -729,10 +729,6 @@ namespace skch // Filter mappings within this subset before merging with previous results for (auto& [querySeqId, mappings] : subsetMappings) { - // XXXXX this filtering should be done within each WORKER THREAD which runs once PER QUERY and is thus able to - filterSubsetMappings(mappings, progress); - // XXXXX this should be done in the aggregator thread - updateChainIds(mappings); // Merge with existing mappings for this query if (combinedMappings.find(querySeqId) == combinedMappings.end()) { @@ -964,6 +960,9 @@ namespace skch } mappingBoundarySanityCheck(input, output->results); + + // Filter mappings within this subset + filterSubsetMappings(output->results, input->progress); return output; } @@ -1030,10 +1029,14 @@ namespace skch QueryMappingOutput* output = nullptr; if (merged_queue.try_pop(output)) { seqno_t querySeqId = idManager->getSequenceId(output->queryName); + auto& mappings = output->results; + // Update chain IDs before merging into combined mappings + updateChainIds(mappings); + combinedMappings[querySeqId].insert( combinedMappings[querySeqId].end(), - output->results.begin(), - output->results.end() + mappings.begin(), + mappings.end() ); delete output; } else if (workers_done.load() && merged_queue.was_empty()) { From b3bada2898fa08e9c3cb33c3ec8c7d39b88377e9 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:35:15 -0600 Subject: [PATCH 08/20] refactor: Modify mapping filtering to preserve merged and non-merged mappings --- src/map/include/computeMap.hpp | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index a8ad7227..a2b0f915 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -55,7 +55,8 @@ namespace skch { struct QueryMappingOutput { std::string queryName; - std::vector results; + std::vector results; // Non-merged mappings + std::vector mergedResults; // Maximally merged mappings std::mutex mutex; progress_meter::ProgressMeter& progress; }; @@ -181,6 +182,10 @@ namespace skch { std::lock_guard lock(fragment->output->mutex); fragment->output->results.insert(fragment->output->results.end(), l2Mappings.begin(), l2Mappings.end()); + // Initialize mergedResults with same mappings + if (param.mergeMappings && param.split) { + fragment->output->mergedResults.insert(fragment->output->mergedResults.end(), l2Mappings.begin(), l2Mappings.end()); + } } // Update progress after processing the fragment @@ -961,8 +966,10 @@ namespace skch mappingBoundarySanityCheck(input, output->results); - // Filter mappings within this subset - filterSubsetMappings(output->results, input->progress); + // Filter and get both merged and non-merged mappings + auto [nonMergedMappings, mergedMappings] = filterSubsetMappings(output->results, input->progress); + output->results = std::move(nonMergedMappings); + output->mergedResults = std::move(mergedMappings); return output; } @@ -2192,13 +2199,17 @@ namespace skch * @param mappings Mappings to filter * @param param Algorithm parameters */ - void filterSubsetMappings(MappingResultsVector_t& mappings, progress_meter::ProgressMeter& progress) { - if (mappings.empty()) return; + std::pair filterSubsetMappings(MappingResultsVector_t& mappings, progress_meter::ProgressMeter& progress) { + if (mappings.empty()) return {MappingResultsVector_t(), MappingResultsVector_t()}; - // Merge and filter chains within this subset + // Only merge once and keep both versions auto maximallyMergedMappings = mergeMappingsInRange(mappings, param.chain_gap, progress); - filterMaximallyMerged(maximallyMergedMappings, param, progress); - mappings = std::move(maximallyMergedMappings); + + // Update chain IDs consistently across both sets + updateChainIds(mappings); + updateChainIds(maximallyMergedMappings); + + return {std::move(mappings), std::move(maximallyMergedMappings)}; } /** From b09c7228d4a0de39a372971a5fd7433790797768 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:45:08 -0600 Subject: [PATCH 09/20] feat: Add mapping filtering logic for merged and non-merged results --- src/map/include/computeMap.hpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index a2b0f915..4cc17120 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -674,6 +674,25 @@ namespace skch } output_thread.join(); + // Process both merged and non-merged mappings + for (auto& [querySeqId, mappings] : combinedMappings) { + if (param.mergeMappings && param.split) { + filterMaximallyMerged(mappings.mergedResults, param, progress); + robin_hood::unordered_set kept_chains; + for (auto &mapping : mappings.mergedResults) { + kept_chains.insert(mapping.splitMappingId); + } + mappings.results.erase( + std::remove_if(mappings.results.begin(), mappings.results.end(), + [&kept_chains](const MappingResult &mapping) { + return !kept_chains.count(mapping.splitMappingId); + }), + mappings.results.end()); + } else { + filterNonMergedMappings(mappings.results, param, progress); + } + } + progress.finish(); } From ea1b006d11b8a568c529e3714ae5563d89481f02 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:45:32 -0600 Subject: [PATCH 10/20] fix: Update QueryMappingOutput and CombinedMappingResults to resolve compilation errors --- src/map/include/computeMap.hpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 4cc17120..1418695f 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -59,6 +59,9 @@ namespace skch std::vector mergedResults; // Maximally merged mappings std::mutex mutex; progress_meter::ProgressMeter& progress; + QueryMappingOutput(const std::string& name, const std::vector& r, + const std::vector& mr, progress_meter::ProgressMeter& p) + : queryName(name), results(r), mergedResults(mr), progress(p) {} }; struct FragmentData { @@ -556,7 +559,11 @@ namespace skch } std::cerr << ", average size: " << std::fixed << std::setprecision(0) << avg_subset_size << "bp" << std::endl; - std::unordered_map combinedMappings; + struct CombinedMappingResults { + std::vector results; + std::vector mergedResults; + }; + std::unordered_map combinedMappings; // Build index for the current subset // Open the index file once From 0247b1700f42f87a861f12f2f97f7a43588c1527 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:45:54 -0600 Subject: [PATCH 11/20] fix: Resolve compilation errors in computeMap.hpp mapping logic --- src/map/include/computeMap.hpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 1418695f..198e4a5d 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -559,11 +559,8 @@ namespace skch } std::cerr << ", average size: " << std::fixed << std::setprecision(0) << avg_subset_size << "bp" << std::endl; - struct CombinedMappingResults { - std::vector results; - std::vector mergedResults; - }; - std::unordered_map combinedMappings; + typedef std::vector MappingResultsVector_t; + std::unordered_map combinedMappings; // Build index for the current subset // Open the index file once @@ -640,7 +637,7 @@ namespace skch // Get total count of mappings uint64_t totalMappings = 0; for (const auto& [querySeqId, mappings] : combinedMappings) { - totalMappings += mappings.size(); + totalMappings += mappings.results.size() + mappings.mergedResults.size(); } // Initialize progress logger @@ -659,7 +656,7 @@ namespace skch // Enqueue tasks for (auto& [querySeqId, mappings] : combinedMappings) { - auto* task = new std::pair(querySeqId, &mappings); + auto* task = new std::pair(querySeqId, &mappings.results); while (!aggregate_queue.try_push(task)) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } From b0dcb869e570d4a29b43a7e2b1227046c80bde6b Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:46:21 -0600 Subject: [PATCH 12/20] fix: Update computeMap.hpp to resolve compilation errors with mapping results --- src/map/include/computeMap.hpp | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 198e4a5d..7e721282 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -560,7 +560,12 @@ namespace skch std::cerr << ", average size: " << std::fixed << std::setprecision(0) << avg_subset_size << "bp" << std::endl; typedef std::vector MappingResultsVector_t; - std::unordered_map combinedMappings; + + struct CombinedMappingResults { + MappingResultsVector_t results; + MappingResultsVector_t mergedResults; + }; + std::unordered_map combinedMappings; // Build index for the current subset // Open the index file once @@ -637,7 +642,7 @@ namespace skch // Get total count of mappings uint64_t totalMappings = 0; for (const auto& [querySeqId, mappings] : combinedMappings) { - totalMappings += mappings.results.size() + mappings.mergedResults.size(); + totalMappings += mappings.size(); } // Initialize progress logger @@ -656,7 +661,7 @@ namespace skch // Enqueue tasks for (auto& [querySeqId, mappings] : combinedMappings) { - auto* task = new std::pair(querySeqId, &mappings.results); + auto* task = new std::pair(querySeqId, &mappings); while (!aggregate_queue.try_push(task)) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -681,19 +686,9 @@ namespace skch // Process both merged and non-merged mappings for (auto& [querySeqId, mappings] : combinedMappings) { if (param.mergeMappings && param.split) { - filterMaximallyMerged(mappings.mergedResults, param, progress); - robin_hood::unordered_set kept_chains; - for (auto &mapping : mappings.mergedResults) { - kept_chains.insert(mapping.splitMappingId); - } - mappings.results.erase( - std::remove_if(mappings.results.begin(), mappings.results.end(), - [&kept_chains](const MappingResult &mapping) { - return !kept_chains.count(mapping.splitMappingId); - }), - mappings.results.end()); + filterMaximallyMerged(mappings, param, progress); } else { - filterNonMergedMappings(mappings.results, param, progress); + filterNonMergedMappings(mappings, param, progress); } } From 28a3a8ff1bcee106a47ecde860cb8c1691f93faf Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:46:47 -0600 Subject: [PATCH 13/20] fix: Refactor CombinedMappingResults to simplify mapping handling --- src/map/include/computeMap.hpp | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 7e721282..db581038 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -560,12 +560,7 @@ namespace skch std::cerr << ", average size: " << std::fixed << std::setprecision(0) << avg_subset_size << "bp" << std::endl; typedef std::vector MappingResultsVector_t; - - struct CombinedMappingResults { - MappingResultsVector_t results; - MappingResultsVector_t mergedResults; - }; - std::unordered_map combinedMappings; + std::unordered_map combinedMappings; // Build index for the current subset // Open the index file once @@ -642,7 +637,7 @@ namespace skch // Get total count of mappings uint64_t totalMappings = 0; for (const auto& [querySeqId, mappings] : combinedMappings) { - totalMappings += mappings.size(); + totalMappings += mappings.results.size() + mappings.mergedResults.size(); } // Initialize progress logger @@ -661,7 +656,7 @@ namespace skch // Enqueue tasks for (auto& [querySeqId, mappings] : combinedMappings) { - auto* task = new std::pair(querySeqId, &mappings); + auto* task = new std::pair(querySeqId, &mappings.results); while (!aggregate_queue.try_push(task)) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -686,9 +681,9 @@ namespace skch // Process both merged and non-merged mappings for (auto& [querySeqId, mappings] : combinedMappings) { if (param.mergeMappings && param.split) { - filterMaximallyMerged(mappings, param, progress); + filterMaximallyMerged(mappings.results, param, progress); } else { - filterNonMergedMappings(mappings, param, progress); + filterNonMergedMappings(mappings.results, param, progress); } } From f45b33029232ceab964c3bbb4346b75e257e5c17 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:47:10 -0600 Subject: [PATCH 14/20] fix: Resolve compilation errors in computeMap.hpp by updating mapping vector handling --- src/map/include/computeMap.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index db581038..b2f33a9c 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -637,7 +637,7 @@ namespace skch // Get total count of mappings uint64_t totalMappings = 0; for (const auto& [querySeqId, mappings] : combinedMappings) { - totalMappings += mappings.results.size() + mappings.mergedResults.size(); + totalMappings += mappings.size(); } // Initialize progress logger @@ -656,7 +656,7 @@ namespace skch // Enqueue tasks for (auto& [querySeqId, mappings] : combinedMappings) { - auto* task = new std::pair(querySeqId, &mappings.results); + auto* task = new std::pair(querySeqId, &mappings); while (!aggregate_queue.try_push(task)) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -681,9 +681,9 @@ namespace skch // Process both merged and non-merged mappings for (auto& [querySeqId, mappings] : combinedMappings) { if (param.mergeMappings && param.split) { - filterMaximallyMerged(mappings.results, param, progress); + filterMaximallyMerged(mappings, param, progress); } else { - filterNonMergedMappings(mappings.results, param, progress); + filterNonMergedMappings(mappings, param, progress); } } From 25d11a4a3c9926171c6832634a9e126da1508d71 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:54:54 -0600 Subject: [PATCH 15/20] refactor: Optimize chain ID generation to use sequential, smaller IDs --- src/map/include/computeMap.hpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index b2f33a9c..bbb9eef3 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -2233,24 +2233,24 @@ namespace skch void updateChainIds(MappingResultsVector_t& mappings) { if (mappings.empty()) return; - // Find maximum chain ID in this subset - offset_t max_subset_id = 0; + // Create a mapping of old IDs to new sequential IDs + std::unordered_map id_map; + offset_t next_id = maxChainIdSeen.fetch_add(1, std::memory_order_relaxed); + + // First pass: build mapping of old IDs to new sequential IDs for (const auto& mapping : mappings) { - max_subset_id = std::max(max_subset_id, mapping.splitMappingId); - } - - // Get current offset and try to update with our max - offset_t current_max = maxChainIdSeen.load(std::memory_order_relaxed); - while (!maxChainIdSeen.compare_exchange_weak(current_max, current_max + max_subset_id + 1, - std::memory_order_release, - std::memory_order_relaxed)) { - // If CAS failed, current_max has the latest value + if (id_map.find(mapping.splitMappingId) == id_map.end()) { + id_map[mapping.splitMappingId] = next_id++; + } } - // Update all chain IDs in this subset with the new offset + // Second pass: update the IDs for (auto& mapping : mappings) { - mapping.splitMappingId += current_max; + mapping.splitMappingId = id_map[mapping.splitMappingId]; } + + // Update maxChainIdSeen to reflect the actual number of new IDs used + maxChainIdSeen.store(next_id, std::memory_order_release); } void computeChainStatistics(std::vector::iterator begin, std::vector::iterator end) { From e218f1390ae2b2e10f3a417700a85d290db094f8 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 16:57:41 -0600 Subject: [PATCH 16/20] refactor: Preserve chain relationships in updateChainIds by using base offset --- src/map/include/computeMap.hpp | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index bbb9eef3..5cb867c8 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -2233,24 +2233,22 @@ namespace skch void updateChainIds(MappingResultsVector_t& mappings) { if (mappings.empty()) return; - // Create a mapping of old IDs to new sequential IDs - std::unordered_map id_map; - offset_t next_id = maxChainIdSeen.fetch_add(1, std::memory_order_relaxed); + // Get current offset + offset_t base_id = maxChainIdSeen.fetch_add(1, std::memory_order_relaxed); - // First pass: build mapping of old IDs to new sequential IDs + // Find the maximum chain ID in this subset + offset_t max_chain_id = 0; for (const auto& mapping : mappings) { - if (id_map.find(mapping.splitMappingId) == id_map.end()) { - id_map[mapping.splitMappingId] = next_id++; - } + max_chain_id = std::max(max_chain_id, mapping.splitMappingId); } - - // Second pass: update the IDs + + // Only update IDs if they haven't been set yet (== position index) + // This preserves chain relationships from mergeMappingsInRange for (auto& mapping : mappings) { - mapping.splitMappingId = id_map[mapping.splitMappingId]; + if (mapping.splitMappingId <= max_chain_id) { + mapping.splitMappingId += base_id; + } } - - // Update maxChainIdSeen to reflect the actual number of new IDs used - maxChainIdSeen.store(next_id, std::memory_order_release); } void computeChainStatistics(std::vector::iterator begin, std::vector::iterator end) { From f1d6046713ba6ac4a5cbff5855d6c22dfa234552 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 17:02:20 -0600 Subject: [PATCH 17/20] refactor: Implement dense chain ID mapping with compact range generation --- src/map/include/computeMap.hpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 5cb867c8..1ab56167 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -2236,18 +2236,20 @@ namespace skch // Get current offset offset_t base_id = maxChainIdSeen.fetch_add(1, std::memory_order_relaxed); - // Find the maximum chain ID in this subset - offset_t max_chain_id = 0; + // Build map of old chain IDs to dense range starting at 0 + std::unordered_map id_map; + offset_t next_id = 0; + + // First pass - build the mapping for (const auto& mapping : mappings) { - max_chain_id = std::max(max_chain_id, mapping.splitMappingId); + if (id_map.count(mapping.splitMappingId) == 0) { + id_map[mapping.splitMappingId] = next_id++; + } } - // Only update IDs if they haven't been set yet (== position index) - // This preserves chain relationships from mergeMappingsInRange + // Second pass - update the IDs to dense range for (auto& mapping : mappings) { - if (mapping.splitMappingId <= max_chain_id) { - mapping.splitMappingId += base_id; - } + mapping.splitMappingId = id_map[mapping.splitMappingId] + base_id; } } From e97d2244998f14aefd7c4d9f6c65aa24aa6a0dab Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 17:08:29 -0600 Subject: [PATCH 18/20] refactor: Move chain ID compaction to MapModule with atomic offset --- src/map/include/computeMap.hpp | 47 +++++++++++++++++----------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/src/map/include/computeMap.hpp b/src/map/include/computeMap.hpp index 1ab56167..af7f6984 100644 --- a/src/map/include/computeMap.hpp +++ b/src/map/include/computeMap.hpp @@ -1050,9 +1050,7 @@ namespace skch if (merged_queue.try_pop(output)) { seqno_t querySeqId = idManager->getSequenceId(output->queryName); auto& mappings = output->results; - // Update chain IDs before merging into combined mappings - updateChainIds(mappings); - + // Chain IDs are already compacted in mapModule combinedMappings[querySeqId].insert( combinedMappings[querySeqId].end(), mappings.begin(), @@ -2218,41 +2216,42 @@ namespace skch // Only merge once and keep both versions auto maximallyMergedMappings = mergeMappingsInRange(mappings, param.chain_gap, progress); - // Update chain IDs consistently across both sets - updateChainIds(mappings); - updateChainIds(maximallyMergedMappings); - - return {std::move(mappings), std::move(maximallyMergedMappings)}; - } - - /** - * @brief Update chain IDs to prevent conflicts between subsets - * @param mappings Mappings whose chain IDs need updating - * @param maxId Current maximum chain ID seen - */ - void updateChainIds(MappingResultsVector_t& mappings) { - if (mappings.empty()) return; - - // Get current offset - offset_t base_id = maxChainIdSeen.fetch_add(1, std::memory_order_relaxed); - - // Build map of old chain IDs to dense range starting at 0 + // Build dense chain ID mapping std::unordered_map id_map; offset_t next_id = 0; - // First pass - build the mapping + // First pass - build the mapping from both sets for (const auto& mapping : mappings) { if (id_map.count(mapping.splitMappingId) == 0) { id_map[mapping.splitMappingId] = next_id++; } } + for (const auto& mapping : maximallyMergedMappings) { + if (id_map.count(mapping.splitMappingId) == 0) { + id_map[mapping.splitMappingId] = next_id++; + } + } - // Second pass - update the IDs to dense range + // Get atomic offset for this batch of chain IDs + offset_t base_id = maxChainIdSeen.fetch_add(id_map.size(), std::memory_order_relaxed); + + // Apply compacted IDs with offset for (auto& mapping : mappings) { mapping.splitMappingId = id_map[mapping.splitMappingId] + base_id; } + for (auto& mapping : maximallyMergedMappings) { + mapping.splitMappingId = id_map[mapping.splitMappingId] + base_id; + } + + return {std::move(mappings), std::move(maximallyMergedMappings)}; } + /** + * @brief Update chain IDs to prevent conflicts between subsets + * @param mappings Mappings whose chain IDs need updating + * @param maxId Current maximum chain ID seen + */ + void computeChainStatistics(std::vector::iterator begin, std::vector::iterator end) { offset_t chain_start_query = std::numeric_limits::max(); offset_t chain_end_query = std::numeric_limits::min(); From a13e992bdc96f2347ed171020a6e33a77ace4f79 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Mon, 25 Nov 2024 17:24:33 -0600 Subject: [PATCH 19/20] feat: Add -X flag to control self-mapping behavior with -G --- src/interface/parse_args.hpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/interface/parse_args.hpp b/src/interface/parse_args.hpp index 213bdad1..1ebb40bf 100644 --- a/src/interface/parse_args.hpp +++ b/src/interface/parse_args.hpp @@ -105,7 +105,8 @@ void parse_args(int argc, args::Flag one_to_one(mapping_opts, "", "Perform one-to-one filtering", {'o', "one-to-one"}); args::Flag lower_triangular(mapping_opts, "", "Only compute the lower triangular for all-vs-all mapping", {'L', "lower-triangular"}); args::ValueFlag skip_prefix(mapping_opts, "C", "map between sequence groups with different prefix [#]", {'Y', "group-prefix"}); - args::Flag disable_grouping(mapping_opts, "", "disable sequence grouping (equivalent to -Y \\0)", {'G', "disable-grouping"}); + args::Flag disable_grouping(mapping_opts, "", "disable sequence grouping and exclude self mappings", {'G', "disable-grouping"}); + args::Flag enable_self_mappings(mapping_opts, "", "enable self mappings (overrides -G)", {'X', "self-maps"}); args::ValueFlag target_prefix(mapping_opts, "pfx", "use only targets whose names start with this prefix", {'T', "target-prefix"}); args::ValueFlag target_list(mapping_opts, "FILE", "file containing list of target sequence names to use", {'R', "target-list"}); args::ValueFlag query_prefix(mapping_opts, "pfxs", "filter queries by comma-separated prefixes", {'Q', "query-prefix"}); @@ -170,13 +171,16 @@ void parse_args(int argc, exit(1); } - map_parameters.skip_self = false; + map_parameters.skip_self = !args::get(enable_self_mappings); map_parameters.lower_triangular = lower_triangular ? args::get(lower_triangular) : false; map_parameters.keep_low_pct_id = true; if (disable_grouping) { map_parameters.prefix_delim = '\0'; map_parameters.skip_prefix = false; + if (!args::get(enable_self_mappings)) { + map_parameters.skip_self = true; + } } else if (skip_prefix) { map_parameters.prefix_delim = args::get(skip_prefix); map_parameters.skip_prefix = map_parameters.prefix_delim != '\0'; From ce2175d49284244e8a697b01190fa747bf9babc4 Mon Sep 17 00:00:00 2001 From: "Erik Garrison (aider)" Date: Wed, 27 Nov 2024 11:32:35 -0600 Subject: [PATCH 20/20] refactor: Remove redundant mappings parameter and use -n/--mappings for segment mapping count --- src/interface/parse_args.hpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/interface/parse_args.hpp b/src/interface/parse_args.hpp index 1ebb40bf..a49fe4ea 100644 --- a/src/interface/parse_args.hpp +++ b/src/interface/parse_args.hpp @@ -99,7 +99,7 @@ void parse_args(int argc, args::Group mapping_opts(options_group, "Mapping:"); args::Flag approx_mapping(mapping_opts, "", "output approximate mappings (no alignment)", {'m', "approx-mapping"}); args::ValueFlag map_pct_identity(mapping_opts, "FLOAT", "minimum mapping identity [70]", {'p', "map-pct-id"}); - args::ValueFlag num_mappings(mapping_opts, "INT", "number of mappings to keep per query/target pair [1]", {'n', "mappings"}); + args::ValueFlag num_mappings(mapping_opts, "INT", "number of mappings to keep per segment [1]", {'n', "mappings"}); args::ValueFlag segment_length(mapping_opts, "INT", "segment length for mapping [1k]", {'s', "segment-length"}); args::ValueFlag block_length(mapping_opts, "INT", "minimum block length [3*segment-length]", {'l', "block-length"}); args::Flag one_to_one(mapping_opts, "", "Perform one-to-one filtering", {'o', "one-to-one"}); @@ -667,12 +667,11 @@ void parse_args(int argc, } #endif - args::ValueFlag num_mappings_for_segments(mapping_opts, "N", "number of mappings per segment [1]", {"mappings-per-segment"}); - if (num_mappings_for_segments) { - if (args::get(num_mappings_for_segments) > 0) { - map_parameters.numMappingsForSegment = args::get(num_mappings_for_segments) ; + if (num_mappings) { + if (args::get(num_mappings) > 0) { + map_parameters.numMappingsForSegment = args::get(num_mappings); } else { - std::cerr << "[wfmash] ERROR, skch::parseandSave, the number of mappings to retain for each segment has to be grater than 0." << std::endl; + std::cerr << "[wfmash] ERROR: the number of mappings to retain (-n) must be greater than 0." << std::endl; exit(1); } } else {