-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed MarkDuplicatesSpark handling of unsorted bams #4732
Conversation
* this should match the results of SAMRecordQueryNameComparator exactly * it operates on GATKRead instead of SAMRecord
Compile warning:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jamesemery Back to you with a few requests. Looks good I think.
sortedReadsForMarking = reads; | ||
} else { | ||
headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname); | ||
sortedReadsForMarking = ReadsSparkSource.putPairsInSamePartition(headerForTool, SparkUtils.querynameSortReads(reads, numReducers), new JavaSparkContext(reads.context())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull the sort onto it's own line. It's not a great idea to hide really expensive operations inline with other calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might extract this whole sorting operation into a function, "queryNameSortReadsIfNecessary"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Test | ||
// Test that asserts the duplicate marking is sorting agnostic, specifically this is testing that when reads are scrambled across | ||
// partitions in the input that all reads in a group are getting properly duplicate marked together as they are for queryname sorted bams | ||
public void testSortOrderParitioningCorrectness() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo paritioning -> partitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} else { | ||
readVoidPairs = rddReadPairs.sortByKey(comparator); | ||
} | ||
return readVoidPairs.keys(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should call the edge fixing method. We don't want to give people the option to do it wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
JavaPairRDD<String, IndexPair<GATKRead>> keyReadPairs = indexedReads.mapToPair(read -> new Tuple2<>(ReadsKey.keyForRead( | ||
read.getValue()), read)); | ||
keyedReads = keyReadPairs.groupByKey(numReducers); | ||
throw new GATKException("MarkDuplicatesSparkUtils.mark() requires input reads to be queryname sorted, yet the header indicated otherwise"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you have it print the sort order it thinks its in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public void testSortOrderParitioningCorrectness() throws IOException { | ||
|
||
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); | ||
JavaRDD<GATKRead> unsortedReads = generateUnsortedReads(10000,3, ctx, 100, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stupid nitpick: spaces here are wonky, and on the next line
samRecordSetBuilder.addPair("READ" + readNameCounter++, 0, start1, start2); | ||
} | ||
} | ||
final ReadCoordinateComparator coordinateComparitor = new ReadCoordinateComparator(hg19Header); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coordinateComparitor is unused, and misspelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it really does have a poor lot in life doesn't it
} | ||
} | ||
|
||
private JavaRDD<GATKRead> generateUnsortedReads(int numReadGroups, int numDuplicatesPerGroup, JavaSparkContext ctx, int numPartitions, boolean coordinate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are sorted... rename to generateReadsWithDuplicates
or something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
private JavaRDD<GATKRead> generateUnsortedReads(int numReadGroups, int numDuplicatesPerGroup, JavaSparkContext ctx, int numPartitions, boolean coordinate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add a bit of javadoc to this method explaining what it does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
sortedHeader.setSortOrder(SAMFileHeader.SortOrder.queryname); | ||
|
||
// Using the header flagged as unsorted will result in the reads being sorted again | ||
JavaRDD<GATKRead> unsortedReadsMarked = MarkDuplicatesSpark.mark(unsortedReads,unsortedHeader, MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES,new OpticalDuplicateFinder(),100,true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is called unsorted, but isn't it actually coordinate sorted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the different num reducers? Is that to find issues with edge fixing? If it is, I think you'd be better off with a specific (and possibly similar) test for that. Since we're always generating pairs, it seems to me that they might never get split across partitions if we're creating an even number of partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the reason for different numbers of partitions was that when I first wrote this test this test there was no exposed way to do the edge fixing for a queryname sorted bam. I didn't want to deal with the problems of having a mispartitioned bam so I let the queryname sorted reads reside on one partition so the spanning couldn't be wrong. Since this is a test of the coordinate sorted bam marking across partitions and not the edge fixing i'm not worried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look good. 👍 When tests pass.
} else { | ||
headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname); | ||
JavaRDD<GATKRead> sortedReads = SparkUtils.querynameSortReads(reads, numReducers); | ||
sortedReadsForMarking = ReadsSparkSource.putPairsInSamePartition(headerForTool, sortedReads, new JavaSparkContext(reads.context())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jamesemery I would use JavaSparkContext.fromSparkContext instead of new JavaSparkContext. I think it's the same, but it might change in future sparks.
Codecov Report
@@ Coverage Diff @@
## master #4732 +/- ##
===============================================
- Coverage 79.977% 79.954% -0.023%
- Complexity 17397 17524 +127
===============================================
Files 1080 1081 +1
Lines 63093 63963 +870
Branches 10179 10420 +241
===============================================
+ Hits 50460 51141 +681
- Misses 8648 8771 +123
- Partials 3985 4051 +66
|
Added a global sort to the beginning of the tool to ensure we are always working with name grouped bams. In the future we should evaluate if alternatives that avoid sorting are necessary.
Fixes #4701