diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java index 20c0356fcb8..354ba8adcf5 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java @@ -117,7 +117,7 @@ public JavaRDD getParallelReads(final String readFileName, final Strin } return null; }).filter(v1 -> v1 != null); - return putPairsInSamePartition(header, reads); + return putPairsInSamePartition(header, reads, ctx); } /** @@ -164,7 +164,7 @@ public JavaRDD getADAMReads(final String inputPath, final TraversalPar .values(); JavaRDD readsRdd = recordsRdd.map(record -> new BDGAlignmentRecordToGATKReadAdapter(record, bHeader.getValue())); JavaRDD filteredRdd = readsRdd.filter(record -> samRecordOverlaps(record.convertToSAMRecord(header), traversalParameters)); - return putPairsInSamePartition(header, filteredRdd); + return putPairsInSamePartition(header, filteredRdd, ctx); } /** @@ -209,7 +209,7 @@ public boolean accept(Path path) { * Ensure reads in a pair fall in the same partition (input split), if the reads are queryname-sorted, * or querygroup sorted, so they are processed together. No shuffle is needed. */ - JavaRDD putPairsInSamePartition(final SAMFileHeader header, final JavaRDD reads) { + public static JavaRDD putPairsInSamePartition(final SAMFileHeader header, final JavaRDD reads, final JavaSparkContext ctx) { if (!ReadUtils.isReadNameGroupedBam(header)) { return reads; } diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java index ca507d283f6..a3576d12438 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java @@ -17,6 +17,8 @@ import org.broadinstitute.hellbender.engine.filters.ReadFilter; import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary; import org.broadinstitute.hellbender.engine.spark.GATKSparkTool; +import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource; +import org.broadinstitute.hellbender.exceptions.GATKException; import org.broadinstitute.hellbender.utils.Utils; import org.broadinstitute.hellbender.utils.read.GATKRead; import org.broadinstitute.hellbender.utils.read.ReadUtils; @@ -24,6 +26,7 @@ import org.broadinstitute.hellbender.utils.read.markduplicates.DuplicationMetrics; import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; import org.broadinstitute.hellbender.utils.read.markduplicates.OpticalDuplicateFinder; +import org.broadinstitute.hellbender.utils.spark.SparkUtils; import picard.cmdline.programgroups.ReadDataManipulationProgramGroup; import scala.Tuple2; @@ -64,22 +67,45 @@ public List getDefaultReadFilters() { return Collections.singletonList(ReadFilterLibrary.ALLOW_ALL_READS); } + /** + * Main method for marking duplicates, takes an JavaRDD of GATKRead and an associated SAMFileHeader with corresponding + * sorting information and returns a new JavaRDD\ in which all read templates have been marked as duplicates + * + * NOTE: This method expects the incoming reads to be grouped by read name (queryname sorted/querygrouped) and for this + * to be explicitly be set in the the provided header. Furthermore, all the reads in a template must be grouped + * into the same partition or there may be problems duplicate marking. + * If MarkDuplicates detects reads are sorted in some other way, it will perform an extra sort operation first, + * thus it is preferable to input reads to this method sorted for performance reasons. + * + * @param reads input reads to be duplicate marked + * @param header header corresponding to the input reads + * @param scoringStrategy method by which duplicates are detected + * @param opticalDuplicateFinder + * @param numReducers number of partitions to separate the data into + * @param dontMarkUnmappedMates when true, unmapped mates of duplicate fragments will be marked as non-duplicates + * @return A JavaRDD of GATKReads where duplicate flags have been set + */ public static JavaRDD mark(final JavaRDD reads, final SAMFileHeader header, final MarkDuplicatesScoringStrategy scoringStrategy, final OpticalDuplicateFinder opticalDuplicateFinder, final int numReducers, final boolean dontMarkUnmappedMates) { + JavaRDD sortedReadsForMarking; + SAMFileHeader headerForTool = header.clone(); + + // If the input isn't queryname sorted, sort it before duplicate marking + sortedReadsForMarking = querynameSortReadsIfNecessary(reads, numReducers, headerForTool); - JavaPairRDD, Integer> namesOfNonDuplicates = MarkDuplicatesSparkUtils.transformToDuplicateNames(header, scoringStrategy, opticalDuplicateFinder, reads, numReducers); + JavaPairRDD, Integer> namesOfNonDuplicates = MarkDuplicatesSparkUtils.transformToDuplicateNames(headerForTool, scoringStrategy, opticalDuplicateFinder, sortedReadsForMarking, numReducers); // Here we explicitly repartition the read names of the unmarked reads to match the partitioning of the original bam final JavaRDD> repartitionedReadNames = namesOfNonDuplicates .mapToPair(pair -> new Tuple2<>(pair._1.getIndex(), new Tuple2<>(pair._1.getValue(),pair._2))) - .partitionBy(new KnownIndexPartitioner(reads.getNumPartitions())) + .partitionBy(new KnownIndexPartitioner(sortedReadsForMarking.getNumPartitions())) .values(); // Here we combine the original bam with the repartitioned unmarked readnames to produce our marked reads - return reads.zipPartitions(repartitionedReadNames, (readsIter, readNamesIter) -> { - final Map namesOfNonDuplicateReadsAndOpticalCounts = Utils.stream(readNamesIter).collect(Collectors.toMap(Tuple2::_1,Tuple2::_2)); + return sortedReadsForMarking.zipPartitions(repartitionedReadNames, (readsIter, readNamesIter) -> { + final Map namesOfNonDuplicateReadsAndOpticalCounts = Utils.stream(readNamesIter).collect(Collectors.toMap(Tuple2::_1,Tuple2::_2, (t1,t2) -> {throw new GATKException("Detected multiple mark duplicate records objects corresponding to read with name, this could be the result of readnames spanning more than one partition");})); return Utils.stream(readsIter).peek(read -> { // Handle reads that have been marked as non-duplicates (which also get tagged with optical duplicate summary statistics) if( namesOfNonDuplicateReadsAndOpticalCounts.containsKey(read.getName())) { @@ -103,6 +129,21 @@ public static JavaRDD mark(final JavaRDD reads, final SAMFil }); } + /** + * Sort reads into queryname order if they are not already sorted + */ + protected static JavaRDD querynameSortReadsIfNecessary(JavaRDD reads, int numReducers, SAMFileHeader headerForTool) { + JavaRDD sortedReadsForMarking; + if (ReadUtils.isReadNameGroupedBam(headerForTool)) { + sortedReadsForMarking = reads; + } else { + headerForTool.setSortOrder(SAMFileHeader.SortOrder.queryname); + JavaRDD sortedReads = SparkUtils.querynameSortReads(reads, numReducers); + sortedReadsForMarking = ReadsSparkSource.putPairsInSamePartition(headerForTool, sortedReads, JavaSparkContext.fromSparkContext(reads.context())); + } + return sortedReadsForMarking; + } + /** * A custom partitioner designed to cut down on spark shuffle costs. * This is designed such that getPartition(key) is called on a key which corresponds to the already known target partition diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java index c8d024d51de..5393b3eb641 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java @@ -60,6 +60,11 @@ public int getIndex() { this.value = value; this.index = index; } + + @Override + public String toString() { + return "indexpair["+index+","+value.toString()+"]"; + } } /** @@ -167,9 +172,7 @@ private static JavaPairRDD>> getReadsGroupe keyedReads = spanReadsByKey(indexedReads); } else { // sort by group and name (incurs a shuffle) - JavaPairRDD> keyReadPairs = indexedReads.mapToPair(read -> new Tuple2<>(ReadsKey.keyForRead( - read.getValue()), read)); - keyedReads = keyReadPairs.groupByKey(numReducers); + throw new GATKException(String.format("MarkDuplicatesSparkUtils.mark() requires input reads to be queryname sorted or querygrouped, yet the header indicated it was in %s order instead", header.getSortOrder())); } return keyedReads; } diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/ReadQueryNameComparator.java b/src/main/java/org/broadinstitute/hellbender/utils/read/ReadQueryNameComparator.java new file mode 100644 index 00000000000..865f6b0bd19 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/ReadQueryNameComparator.java @@ -0,0 +1,61 @@ +package org.broadinstitute.hellbender.utils.read; + +import htsjdk.samtools.SAMRecordQueryNameComparator; +import htsjdk.samtools.SAMTag; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * compare {@link GATKRead} by queryname + * duplicates the exact ordering of {@link SAMRecordQueryNameComparator} + */ +public class ReadQueryNameComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public int compare(final GATKRead read1, final GATKRead read2) { + int cmp = compareReadNames(read1, read2); + if (cmp != 0) { + return cmp; + } + + final boolean r1Paired = read1.isPaired(); + final boolean r2Paired = read2.isPaired(); + + if (r1Paired || r2Paired) { + if (!r1Paired) return 1; + else if (!r2Paired) return -1; + else if (read1.isFirstOfPair() && read2.isSecondOfPair()) return -1; + else if (read1.isSecondOfPair() && read2.isFirstOfPair()) return 1; + } + + if (read1.isReverseStrand() != read2.isReverseStrand()) { + return (read1.isReverseStrand()? 1: -1); + } + if (read1.isSecondaryAlignment() != read2.isSecondaryAlignment()) { + return read2.isSecondaryAlignment()? -1: 1; + } + if (read1.isSupplementaryAlignment() != read2.isSupplementaryAlignment()) { + return read2.isSupplementaryAlignment() ? -1 : 1; + } + final Integer hitIndex1 = read1.getAttributeAsInteger(SAMTag.HI.name()); + final Integer hitIndex2 = read2.getAttributeAsInteger(SAMTag.HI.name()); + if (hitIndex1 != null) { + if (hitIndex2 == null) return 1; + else { + cmp = hitIndex1.compareTo(hitIndex2); + if (cmp != 0) return cmp; + } + } else if (hitIndex2 != null) return -1; + return 0; + } + + /** + * compare read names lexicographically without any additional tie breakers + */ + public int compareReadNames(final GATKRead read1, final GATKRead read2) { + return read1.getName().compareTo(read2.getName()); + } +} + diff --git a/src/main/java/org/broadinstitute/hellbender/utils/spark/SparkUtils.java b/src/main/java/org/broadinstitute/hellbender/utils/spark/SparkUtils.java index af897601fa1..d281033c79f 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/spark/SparkUtils.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/spark/SparkUtils.java @@ -17,6 +17,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSink; +import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSource; import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.read.*; import org.broadinstitute.hellbender.utils.Utils; @@ -136,6 +137,31 @@ public static JavaRDD coordinateSortReads(final JavaRDD read // do a total sort so that all the reads in partition i are less than those in partition i+1 final Comparator comparator = new ReadCoordinateComparator(header); final JavaPairRDD readVoidPairs; + final JavaRDD output; + if (numReducers > 0) { + readVoidPairs = rddReadPairs.sortByKey(comparator, true, numReducers); + output = ReadsSparkSource.putPairsInSamePartition(header, readVoidPairs.keys(), new JavaSparkContext(readVoidPairs.context())); + } else { + readVoidPairs = rddReadPairs.sortByKey(comparator); + output = readVoidPairs.keys(); + } + return output; + } + + /** + * Sorts the given reads in queryname sort order. + * @param reads the reads to sort + * @param numReducers the number of reducers to use; a value of 0 means use the default number of reducers + * @return a sorted RDD of reads + */ + public static JavaRDD querynameSortReads(final JavaRDD reads, final int numReducers) { + // Turn into key-value pairs so we can sort (by key). Values are null so there is no overhead in the amount + // of data going through the shuffle. + final JavaPairRDD rddReadPairs = reads.mapToPair(read -> new Tuple2<>(read, (Void) null)); + + // do a total sort so that all the reads in partition i are less than those in partition i+1 + final Comparator comparator = new ReadQueryNameComparator(); + final JavaPairRDD readVoidPairs; if (numReducers > 0) { readVoidPairs = rddReadPairs.sortByKey(comparator, true, numReducers); } else { diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java index e9b78f3361a..b7061683d3d 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java @@ -295,7 +295,7 @@ public void testPutPairsInSamePartition(int numPairs, int numPartitions, int num header.setSortOrder(SAMFileHeader.SortOrder.queryname); JavaRDD reads = ctx.parallelize(createPairedReads(ctx, header, numPairs, numReadsInPair), numPartitions); ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx); - JavaRDD pairedReads = readsSparkSource.putPairsInSamePartition(header, reads); + JavaRDD pairedReads = ReadsSparkSource.putPairsInSamePartition(header, reads, ctx); List> partitions = pairedReads.mapPartitions((FlatMapFunction, List>) it -> Iterators.singletonIterator(Lists.newArrayList(it))).collect(); assertEquals(partitions.size(), numPartitions); @@ -336,6 +336,6 @@ public void testReadsPairsSpanningMultiplePartitionsCrash() throws IOException { JavaRDD problemReads = ctx.parallelize(reads,5 ); ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx); - readsSparkSource.putPairsInSamePartition(header, problemReads); + ReadsSparkSource.putPairsInSamePartition(header, problemReads, ctx); } } diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java index 82355121ec3..7179dfea7b3 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java @@ -3,27 +3,19 @@ import com.google.api.client.util.Lists; import com.google.common.collect.ImmutableList; import htsjdk.samtools.*; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.broadcast.Broadcast; import org.broadinstitute.hellbender.engine.spark.SparkContextFactory; -import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSink; import org.broadinstitute.hellbender.utils.read.*; import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; -import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey; +import org.broadinstitute.hellbender.utils.read.markduplicates.OpticalDuplicateFinder; import org.broadinstitute.hellbender.GATKBaseTest; -import org.broadinstitute.hellbender.utils.test.SamAssertionUtils; import org.testng.Assert; import org.testng.annotations.Test; import scala.Tuple2; -import scala.collection.Seq; -import java.io.File; import java.io.IOException; import java.util.*; -import java.util.stream.Collectors; public class MarkDuplicatesSparkUtilsUnitTest extends GATKBaseTest { @Test(groups = "spark") @@ -66,4 +58,61 @@ private static Tuple2> pairIterable(String key, GATKR return new Tuple2<>(key, ImmutableList.copyOf(reads)); } + @Test + // Test that asserts the duplicate marking is sorting agnostic, specifically this is testing that when reads are scrambled across + // partitions in the input that all reads in a group are getting properly duplicate marked together as they are for queryname sorted bams + public void testSortOrderPartitioningCorrectness() throws IOException { + + JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); + JavaRDD unsortedReads = generateReadsWithDuplicates(10000,3, ctx, 99, true); + JavaRDD pariedEndsQueryGrouped = generateReadsWithDuplicates(10000,3, ctx,1, false); //Use only one partition to avoid having to do edge fixing. + + SAMFileHeader unsortedHeader = hg19Header.clone(); + unsortedHeader.setSortOrder(SAMFileHeader.SortOrder.unsorted); + SAMFileHeader sortedHeader = hg19Header.clone(); + sortedHeader.setSortOrder(SAMFileHeader.SortOrder.queryname); + + // Using the header flagged as unsorted will result in the reads being sorted again + JavaRDD unsortedReadsMarked = MarkDuplicatesSpark.mark(unsortedReads,unsortedHeader, MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES,new OpticalDuplicateFinder(),100,true); + JavaRDD sortedReadsMarked = MarkDuplicatesSpark.mark(pariedEndsQueryGrouped,sortedHeader, MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES,new OpticalDuplicateFinder(),1,true); + + Iterator sortedReadsFinal = sortedReadsMarked.sortBy(GATKRead::commonToString, false, 1).collect().iterator(); + Iterator unsortedReadsFinal = unsortedReadsMarked.sortBy(GATKRead::commonToString, false, 1).collect().iterator(); + + // Comparing the output reads to ensure they are all duplicate marked correctly + while (sortedReadsFinal.hasNext()) { + GATKRead read1 = sortedReadsFinal.next(); + GATKRead read2 = unsortedReadsFinal.next(); + Assert.assertEquals(read1.getName(), read2.getName()); + Assert.assertEquals(read1.isDuplicate(), read2.isDuplicate()); + } + } + + // This helper method is used to generate groups reads that will be duplicate marked. It does this by generating numDuplicatesPerGroup + // pairs of reads starting at randomly selected starting locations. The start locations are random so that if the resulting RDD is + // coordinate sorted that it is more or less guaranteed that a large portion of the reads will reside on separate partitions from + // their mates. It also handles sorting of the reads into either queryname or coordinate orders. + private JavaRDD generateReadsWithDuplicates(int numReadGroups, int numDuplicatesPerGroup, JavaSparkContext ctx, int numPartitions, boolean coordinate) { + int readNameCounter = 0; + SAMRecordSetBuilder samRecordSetBuilder = new SAMRecordSetBuilder(true, SAMFileHeader.SortOrder.coordinate, + true, SAMRecordSetBuilder.DEFAULT_CHROMOSOME_LENGTH, SAMRecordSetBuilder.DEFAULT_DUPLICATE_SCORING_STRATEGY); + + Random rand = new Random(10); + for (int i = 0; i < numReadGroups; i++ ) { + int start1 = rand.nextInt(SAMRecordSetBuilder.DEFAULT_CHROMOSOME_LENGTH); + int start2 = rand.nextInt(SAMRecordSetBuilder.DEFAULT_CHROMOSOME_LENGTH); + for (int j = 0; j < numDuplicatesPerGroup; j++) { + samRecordSetBuilder.addPair("READ" + readNameCounter++, 0, start1, start2); + } + } + List records = Lists.newArrayList(samRecordSetBuilder.getRecords()); + if (coordinate) { + records.sort(new SAMRecordCoordinateComparator()); + } else { + records.sort(new SAMRecordQueryNameComparator()); + } + + return ctx.parallelize(records, numPartitions).map(SAMRecordToGATKReadAdapter::new); + } + } diff --git a/src/test/java/org/broadinstitute/hellbender/utils/read/ReadQueryNameComparatorUnitTest.java b/src/test/java/org/broadinstitute/hellbender/utils/read/ReadQueryNameComparatorUnitTest.java new file mode 100644 index 00000000000..253943d4512 --- /dev/null +++ b/src/test/java/org/broadinstitute/hellbender/utils/read/ReadQueryNameComparatorUnitTest.java @@ -0,0 +1,134 @@ +package org.broadinstitute.hellbender.utils.read; + +import htsjdk.samtools.SAMFileHeader; +import htsjdk.samtools.SAMRecordQueryNameComparator; +import htsjdk.samtools.SAMTag; +import org.broadinstitute.hellbender.GATKBaseTest; +import org.broadinstitute.hellbender.engine.ReadsDataSource; +import org.broadinstitute.hellbender.utils.io.IOUtils; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.*; + + +public class ReadQueryNameComparatorUnitTest extends GATKBaseTest { + + public static final SAMFileHeader HEADER =ArtificialReadUtils.createArtificialSamHeader(); + public static final String NAME = "NAME"; + + /** + * Tests that the ordering produced by {@link ReadQueryNameComparator} matches queryname ordering + * as produced by htsjdk's {@link SAMRecordQueryNameComparator} for a representative selection of reads. Ignores + * differences in tie-breaking done for reads with the same position -- just asserts that the reads are + * queryname-sorted according to htsjdk, including unmapped reads with and without an assigned position. + */ + @Test + public void testComparatorOrderingMatchesHtsjdkFileOrdering() throws IOException { + final String inputBam = publicTestDir + "org/broadinstitute/hellbender/utils/read/comparator_test_with_unmapped.bam"; + final List reads = new ArrayList<>(); + SAMFileHeader header; + + try ( final ReadsDataSource readsSource = new ReadsDataSource(IOUtils.getPath(inputBam)) ) { + header = readsSource.getHeader(); + + for ( GATKRead read : readsSource ) { + reads.add(read); + } + } + + // Randomize ordering and then re-sort + Collections.shuffle(reads); + reads.sort(new ReadQueryNameComparator()); + + final SAMRecordQueryNameComparator samComparator = new SAMRecordQueryNameComparator(); + GATKRead previousRead = null; + for ( final GATKRead currentRead : reads ) { + if ( previousRead != null ) { + Assert.assertTrue(samComparator.compare(previousRead.convertToSAMRecord(header), currentRead.convertToSAMRecord(header)) <= 0, + "Reads are out of order: " + previousRead + " and " + currentRead); + } + previousRead = currentRead; + } + } + + @DataProvider + public Object[][] getNames(){ + return new Object[][]{ + {"A", "B", -1}, + {"A","A", 0}, + {"AA", "A", 1}, + {"1","10", -1}, + {"2", "10", 1} + }; + } + + + + @Test(dataProvider = "getNames") + public void testCompareNames(String firstName, String secondName, int expected) throws Exception { + ReadQueryNameComparator comp = new ReadQueryNameComparator(); + GATKRead first = getRead(firstName); + GATKRead second = getRead(secondName); + Assert.assertEquals(comp.compareReadNames(first, second ), expected); + Assert.assertEquals(comp.compareReadNames(second, first), -expected); + Assert.assertEquals(comp.compareReadNames(first, first), 0); + Assert.assertEquals(comp.compareReadNames(second, second), 0); + } + + private static GATKRead getRead(String firstName) { + final GATKRead read = ArtificialReadUtils.createArtificialRead(HEADER, firstName, 1, 100, 10); + return read; + } + + @DataProvider + public Iterator getReads(){ + final GATKRead differentName = getRead(NAME+NAME); + + final GATKRead unpaired = getRead(NAME); + unpaired.setIsPaired(false); + + final GATKRead paired = getRead(NAME); + paired.setIsPaired(true); + + final GATKRead firstOfPair = getRead(NAME); + firstOfPair.setIsFirstOfPair(); + + final GATKRead secondOfPair = getRead(NAME); + secondOfPair.setIsSecondOfPair(); + + final GATKRead reverseStrand = getRead(NAME); + reverseStrand.setIsReverseStrand(true); + + final GATKRead supplementary = getRead(NAME); + supplementary.setIsSupplementaryAlignment(true); + + final GATKRead secondary = getRead(NAME); + secondary.setIsSecondaryAlignment(true); + + final GATKRead tagHI1 = getRead(NAME); + tagHI1.setAttribute(SAMTag.HI.name(), 1); + + final GATKRead tagHI2 = getRead(NAME); + tagHI2.setAttribute(SAMTag.HI.name(), 2); + + List reads = Arrays.asList(differentName, unpaired, paired, firstOfPair, secondOfPair, reverseStrand, supplementary, secondary, tagHI1, tagHI2); + List tests = new ArrayList<>(); + for(GATKRead left: reads){ + for(GATKRead right: reads){ + tests.add(new Object[]{left, right}); + } + }; + return tests.iterator(); + } + + @Test(dataProvider = "getReads") + public void testTieBreakers(GATKRead left, GATKRead right){ + ReadQueryNameComparator readComparator = new ReadQueryNameComparator(); + SAMRecordQueryNameComparator samComparator = new SAMRecordQueryNameComparator(); + Assert.assertEquals(readComparator.compare(left, right), samComparator.compare(left.convertToSAMRecord(HEADER), right.convertToSAMRecord(HEADER))); + } + +} \ No newline at end of file