From 40a40bccb380017534a5904487b9a35d9ce16fb5 Mon Sep 17 00:00:00 2001 From: Kristian Cibulskis Date: Tue, 11 Jan 2022 13:00:52 -0500 Subject: [PATCH 1/3] presorted avro files, fix performance issue --- .../tools/gvs/extract/ExtractCohort.java | 15 +++- .../gvs/extract/ExtractCohortEngine.java | 82 +++++++++++++------ .../utils/localsort/SortingCollection.java | 7 ++ ...xtractCohortRemoveAnnotationsUnitTest.java | 3 +- 4 files changed, 76 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohort.java b/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohort.java index 1362eefdb21..bbd5fa53b5e 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohort.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohort.java @@ -88,7 +88,7 @@ public enum VQSLODFilteringType { GENOTYPE, SITES, NONE } @Argument( fullName = "vet-avro-file-name", - doc = "Path to unsorted data from Vet table in Avro format", + doc = "Path to data from Vet table in Avro format", mutex = {"cohort-extract-table"}, optional = true ) @@ -96,12 +96,20 @@ public enum VQSLODFilteringType { GENOTYPE, SITES, NONE } @Argument( fullName = "ref-ranges-avro-file-name", - doc = "Path to unsorted data from Vet table in Avro format", + doc = "Path to data from Vet table in Avro format", mutex = {"cohort-extract-table"}, optional = true ) private GATKPath refRangesAvroFileName = null; + @Argument( + fullName = "presorted-avro-files", + doc = "Indicates if Avro data is pre-sorted", + mutex = {"cohort-extract-table"}, + optional = true + ) + private boolean presortedAvroFiles = false; + @Argument( fullName = "filter-set-name", doc = "Name in filter_set_name column of filtering table to use. Which training set should be applied in extract.", @@ -318,7 +326,8 @@ protected void onStartup() { emitPLs, vqslodfilteringType, excludeFilteredSites, - inferredReferenceState); + inferredReferenceState, + presortedAvroFiles); vcfWriter.writeHeader(header); } diff --git a/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java b/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java index 08ad1b7533a..f6568de1bba 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java @@ -1,9 +1,8 @@ package org.broadinstitute.hellbender.tools.gvs.extract; -import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.common.collect.Sets; import static java.util.stream.Collectors.toList; -import htsjdk.samtools.util.CloseableIterator; + import htsjdk.samtools.util.OverlapDetector; import htsjdk.variant.variantcontext.Allele; import htsjdk.variant.variantcontext.GenotypeBuilder; @@ -86,7 +85,7 @@ public class ExtractCohortEngine { private final String filterSetName; private final GQStateEnum inferredReferenceState; - + private final boolean presortedAvroFiles; public ExtractCohortEngine(final String projectID, final VariantContextWriter vcfWriter, @@ -114,7 +113,8 @@ public ExtractCohortEngine(final String projectID, final boolean emitPLs, final ExtractCohort.VQSLODFilteringType VQSLODFilteringType, final boolean excludeFilteredSites, - final GQStateEnum inferredReferenceState + final GQStateEnum inferredReferenceState, + final boolean presortedAvroFiles ) { this.localSortMaxRecordsInRam = localSortMaxRecordsInRam; @@ -156,6 +156,8 @@ public ExtractCohortEngine(final String projectID, this.variantContextMerger = new ReferenceConfidenceVariantContextMerger(annotationEngine, vcfHeader); this.inferredReferenceState = inferredReferenceState; + + this.presortedAvroFiles = presortedAvroFiles; } int getTotalNumberOfVariants() { return totalNumberOfVariants; } @@ -230,10 +232,12 @@ public void traverse() { throw new GATKException("Can not process cross-contig boundaries for Ranges implementation"); } + SortedSet sampleIdsToExtract = new TreeSet<>(this.sampleIdToName.keySet()); + if (vetRangesFQDataSet != null) { - createVariantsFromUnsortedBigQueryRanges(vetRangesFQDataSet, this.sampleIdToName.keySet(), minLocation, maxLocation, fullVqsLodMap, fullYngMap, siteFilterMap, noVqslodFilteringRequested); + createVariantsFromUnsortedBigQueryRanges(vetRangesFQDataSet, sampleIdsToExtract, minLocation, maxLocation, fullVqsLodMap, fullYngMap, siteFilterMap, noVqslodFilteringRequested); } else { - createVariantsFromUnsortedAvroRanges(vetAvroFileName, refRangesAvroFileName, this.sampleIdToName.keySet(), minLocation, maxLocation, fullVqsLodMap, fullYngMap, siteFilterMap, noVqslodFilteringRequested); + createVariantsFromUnsortedAvroRanges(vetAvroFileName, refRangesAvroFileName, sampleIdsToExtract, minLocation, maxLocation, fullVqsLodMap, fullYngMap, siteFilterMap, noVqslodFilteringRequested, presortedAvroFiles); } } else { if (cohortTableRef != null) { @@ -296,7 +300,7 @@ private SortingCollection addToVetSortingCollection(final Sorting if (intervalsOverlapDetector.overlapsAny(simpleInverval)) { vbs.setVariant(location); sortingCollection.add(queryRow); - if (recordsProcessed++ % 1000000 == 0) { + if (++recordsProcessed % 1000000 == 0) { long endTime = System.currentTimeMillis(); logger.info("Processed " + recordsProcessed + " VET records in " + (endTime - startTime) + " ms"); startTime = endTime; @@ -304,6 +308,9 @@ private SortingCollection addToVetSortingCollection(final Sorting } } + long endTime = System.currentTimeMillis(); + logger.info("Processed " + recordsProcessed + " VET records in " + (endTime - startTime) + " ms"); + sortingCollection.printTempFileStats(); return sortingCollection; } @@ -320,13 +327,16 @@ private SortingCollection addToRefSortingCollection(final Sorting sortingCollection.add(queryRow); } - if (recordsProcessed++ % 1000000 == 0) { + if (++recordsProcessed % 1000000 == 0) { long endTime = System.currentTimeMillis(); logger.info("Processed " + recordsProcessed + " Reference Ranges records in " + (endTime - startTime) + " ms"); startTime = endTime; } } + long endTime = System.currentTimeMillis(); + logger.info("Processed " + recordsProcessed + " Reference Ranges records in " + (endTime - startTime) + " ms"); + sortingCollection.printTempFileStats(); return sortingCollection; } @@ -880,7 +890,7 @@ private SortingCollection createSortedReferenceRangeCollectionFro private void createVariantsFromUnsortedBigQueryRanges( final String fqDatasetName, - final Set sampleIdsToExtract, + final SortedSet sampleIdsToExtract, final Long minLocation, final Long maxLocation, final HashMap>> fullVqsLodMap, @@ -918,31 +928,44 @@ private void createVariantsFromUnsortedBigQueryRanges( private void createVariantsFromUnsortedAvroRanges( final GATKPath vetAvroFileName, final GATKPath refRangesAvroFileName, - final Set sampleIdsToExtract, + final SortedSet sampleIdsToExtract, final Long minLocation, final Long maxLocation, final HashMap>> fullVqsLodMap, final HashMap>> fullYngMap, final HashMap> siteFilterMap, - final boolean noVqslodFilteringRequested) { + final boolean noVqslodFilteringRequested, + final boolean presortedAvroFiles) { final AvroFileReader vetReader = new AvroFileReader(vetAvroFileName); final AvroFileReader refRangesReader = new AvroFileReader(refRangesAvroFileName); - VariantBitSet vbs = new VariantBitSet(minLocation, maxLocation); + Iterable sortedVet; + Iterable sortedReferenceRange; + + if (presortedAvroFiles) { + sortedVet = vetReader; + sortedReferenceRange = refRangesReader; + } else { + VariantBitSet vbs = new VariantBitSet(minLocation, maxLocation); - SortingCollection sortedVet = getAvroSortingCollection(vetReader.getSchema(), localSortMaxRecordsInRam); - addToVetSortingCollection(sortedVet, vetReader, vbs); + SortingCollection localSortedVet = getAvroSortingCollection(vetReader.getSchema(), localSortMaxRecordsInRam); + addToVetSortingCollection(localSortedVet, vetReader, vbs); - SortingCollectionsortedReferenceRange = getAvroSortingCollection(refRangesReader.getSchema(), localSortMaxRecordsInRam); - addToRefSortingCollection(sortedReferenceRange, refRangesReader, vbs); + SortingCollection localSortedReferenceRange = getAvroSortingCollection(refRangesReader.getSchema(), localSortMaxRecordsInRam); + addToRefSortingCollection(localSortedReferenceRange, refRangesReader, vbs); + + sortedVet = localSortedVet; + sortedReferenceRange = localSortedReferenceRange; + } createVariantsFromSortedRanges(sampleIdsToExtract, sortedVet, sortedReferenceRange, fullVqsLodMap, fullYngMap, siteFilterMap, noVqslodFilteringRequested); + } - private void createVariantsFromSortedRanges(final Set sampleIdsToExtract, - final SortingCollection sortedVet, - SortingCollection sortedReferenceRange, + private void createVariantsFromSortedRanges(final SortedSet sampleIdsToExtract, + final Iterable sortedVet, + Iterable sortedReferenceRange, final HashMap>> fullVqsLodMap, final HashMap>> fullYngMap, final HashMap> siteFilterMap, @@ -971,7 +994,7 @@ private void createVariantsFromSortedRanges(final Set sampleIdsToExtract, // NOTE: if OverlapDetector takes too long, try using RegionChecker from tws_sv_local_assembler final OverlapDetector intervalsOverlapDetector = OverlapDetector.create(traversalIntervals); - CloseableIterator sortedReferenceRangeIterator = sortedReferenceRange.iterator(); + Iterator sortedReferenceRangeIterator = sortedReferenceRange.iterator(); for (final GenericRecord sortedRow : sortedVet) { final ExtractCohortRecord vetRow = new ExtractCohortRecord(sortedRow); @@ -1049,16 +1072,21 @@ private void handlePotentialSpanningDeletion(ExtractCohortRecord vetRow, Map currentPositionRecords, CloseableIterator sortedReferenceRangeIterator, Map> referenceCache, long location, long fromSampleId, long toSampleId, Set sampleIdsToExtract) { + private void processReferenceData(Map currentPositionRecords, Iterator sortedReferenceRangeIterator, Map> referenceCache, long location, long fromSampleId, long toSampleId, SortedSet sampleIdsToExtract) { + // in the case where there are two adjacent samples with variants, this method is called where from is greater than to + // this is ok, there is just no reference data to process but subSet will throw an exception so we handle it with this if block + if (toSampleId >= fromSampleId) { + SortedSet samples = sampleIdsToExtract.subSet(fromSampleId, toSampleId + 1); // subset is start-inclusive, end-exclusive - List samples = sampleIdsToExtract.stream().filter(x -> x >= fromSampleId && x <= toSampleId).sorted().collect(toList()); - for(Long s : samples) { - ExtractCohortRecord e = processReferenceData(sortedReferenceRangeIterator, referenceCache, location, s); - currentPositionRecords.merge(s, e, this::mergeSampleRecord); + // List samples = sampleIdsToExtract.stream().filter(x -> x >= fromSampleId && x <= toSampleId).sorted().collect(toList()); + for (Long s : samples) { + ExtractCohortRecord e = processReferenceData(sortedReferenceRangeIterator, referenceCache, location, s); + currentPositionRecords.merge(s, e, this::mergeSampleRecord); + } } } - private ExtractCohortRecord processReferenceData(CloseableIterator sortedReferenceRangeIterator, Map> referenceCache, long location, long sampleId) { + private ExtractCohortRecord processReferenceData(Iterator sortedReferenceRangeIterator, Map> referenceCache, long location, long sampleId) { String state = processReferenceDataFromCache(referenceCache, location, sampleId); if (state == null) { @@ -1101,7 +1129,7 @@ private String processReferenceDataFromCache(Map> } } - private String processReferenceDataFromStream(CloseableIterator sortedReferenceRangeIterator, Map> referenceCache, long location, long sampleId) { + private String processReferenceDataFromStream(Iterator sortedReferenceRangeIterator, Map> referenceCache, long location, long sampleId) { while(sortedReferenceRangeIterator.hasNext()) { final ReferenceRecord refRow = new ReferenceRecord(sortedReferenceRangeIterator.next()); totalRangeRecords++; diff --git a/src/main/java/org/broadinstitute/hellbender/utils/localsort/SortingCollection.java b/src/main/java/org/broadinstitute/hellbender/utils/localsort/SortingCollection.java index 866450189ab..b12a50900cd 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/localsort/SortingCollection.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/localsort/SortingCollection.java @@ -226,8 +226,11 @@ public void setDestructiveIteration(boolean destructiveIteration) { */ public void spillToDisk() { try { + long startTime = System.currentTimeMillis(); Arrays.parallelSort(this.ramRecords, 0, this.numRecordsInRam, this.comparator); + log.info(String.format("%d records in ram sorted in %d ms. ", numRecordsInRam, System.currentTimeMillis() - startTime)); + startTime = System.currentTimeMillis(); final Path f = newTempFile(); try (OutputStream os = tempStreamFactory.wrapTempOutputStream(Files.newOutputStream(f), Defaults.BUFFER_SIZE)) { @@ -242,6 +245,7 @@ public void spillToDisk() { throw new RuntimeIOException("Problem writing temporary file " + f.toUri() + ". Try setting TMP_DIR to a file system with lots of space.", ex); } + log.info(String.format("%d records in ram spilled to disk in %d ms. ", numRecordsInRam, System.currentTimeMillis() - startTime)); this.numRecordsInRam = 0; this.files.add(f); @@ -464,10 +468,13 @@ class InMemoryIterator implements CloseableIterator { private int iterationIndex = 0; InMemoryIterator() { + long startTime = System.currentTimeMillis(); Arrays.parallelSort(SortingCollection.this.ramRecords, 0, SortingCollection.this.numRecordsInRam, SortingCollection.this.comparator); + log.info(String.format("%d records in ram sorted in %d ms. ", numRecordsInRam, System.currentTimeMillis() - startTime)); + } @Override diff --git a/src/test/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortRemoveAnnotationsUnitTest.java b/src/test/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortRemoveAnnotationsUnitTest.java index 478b5d9c17f..94f502bfd64 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortRemoveAnnotationsUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortRemoveAnnotationsUnitTest.java @@ -47,7 +47,8 @@ public void testRemoveAnnotations() { false, ExtractCohort.VQSLODFilteringType.NONE, false, - GQStateEnum.SIXTY + GQStateEnum.SIXTY, + false ); List variantContexts = VariantContextTestUtils.getVariantContexts(ORIGINAL_TEST_FILE); // list variantContexts from VCF file From 1191e42a59e72c7b04dd24f82bd2f63d91bf04b7 Mon Sep 17 00:00:00 2001 From: Kristian Cibulskis Date: Tue, 11 Jan 2022 13:09:07 -0500 Subject: [PATCH 2/3] updated WDL; JAR --- scripts/variantstore/wdl/GvsCreateFilterSet.wdl | 2 +- scripts/variantstore/wdl/GvsExtractCallset.wdl | 2 +- scripts/variantstore/wdl/GvsImportGenomes.wdl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/variantstore/wdl/GvsCreateFilterSet.wdl b/scripts/variantstore/wdl/GvsCreateFilterSet.wdl index 94bac3c1ab6..da968f44472 100644 --- a/scripts/variantstore/wdl/GvsCreateFilterSet.wdl +++ b/scripts/variantstore/wdl/GvsCreateFilterSet.wdl @@ -24,7 +24,7 @@ workflow GvsCreateFilterSet { File? excluded_intervals String output_file_base_name - File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_fix_flush_20220105/gatk-package-4.2.0.0-452-gb9496ed-SNAPSHOT-local.jar" + File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_extract_perf_20220111/gatk-package-4.2.0.0-455-g40a40bc-SNAPSHOT-local.jar" File dbsnp_vcf File dbsnp_vcf_index diff --git a/scripts/variantstore/wdl/GvsExtractCallset.wdl b/scripts/variantstore/wdl/GvsExtractCallset.wdl index 163c972bb16..854d5489413 100644 --- a/scripts/variantstore/wdl/GvsExtractCallset.wdl +++ b/scripts/variantstore/wdl/GvsExtractCallset.wdl @@ -46,7 +46,7 @@ workflow GvsExtractCallset { String output_file_base_name String? output_gcs_dir - File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_fix_flush_20220105/gatk-package-4.2.0.0-452-gb9496ed-SNAPSHOT-local.jar" + File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_extract_perf_20220111/gatk-package-4.2.0.0-455-g40a40bc-SNAPSHOT-local.jar" Int local_disk_for_extract = 150 String fq_samples_to_extract_table = "~{data_project}.~{default_dataset}.~{extract_table_prefix}__SAMPLES" diff --git a/scripts/variantstore/wdl/GvsImportGenomes.wdl b/scripts/variantstore/wdl/GvsImportGenomes.wdl index a2e29cd9ee4..7e32b7ab18e 100644 --- a/scripts/variantstore/wdl/GvsImportGenomes.wdl +++ b/scripts/variantstore/wdl/GvsImportGenomes.wdl @@ -17,7 +17,7 @@ workflow GvsImportGenomes { Int batch_size = 1 Int? preemptible_tries - File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_fix_flush_20220105/gatk-package-4.2.0.0-452-gb9496ed-SNAPSHOT-local.jar" + File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_extract_perf_20220111/gatk-package-4.2.0.0-455-g40a40bc-SNAPSHOT-local.jar" String? docker } From 473217092cb4c8e484cf4d05f9df5ae7f27c3768 Mon Sep 17 00:00:00 2001 From: Kristian Cibulskis Date: Tue, 11 Jan 2022 13:34:10 -0500 Subject: [PATCH 3/3] PR feedback --- .../hellbender/tools/gvs/extract/ExtractCohortEngine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java b/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java index f6568de1bba..d1de803d364 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/gvs/extract/ExtractCohortEngine.java @@ -1078,7 +1078,6 @@ private void processReferenceData(Map currentPosition if (toSampleId >= fromSampleId) { SortedSet samples = sampleIdsToExtract.subSet(fromSampleId, toSampleId + 1); // subset is start-inclusive, end-exclusive - // List samples = sampleIdsToExtract.stream().filter(x -> x >= fromSampleId && x <= toSampleId).sorted().collect(toList()); for (Long s : samples) { ExtractCohortRecord e = processReferenceData(sortedReferenceRangeIterator, referenceCache, location, s); currentPositionRecords.merge(s, e, this::mergeSampleRecord);