diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala index 32d6b1364d..7153ae4403 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala @@ -20,13 +20,10 @@ package org.bdgenomics.adam.rdd.read import org.bdgenomics.utils.misc.Logging import org.apache.spark.rdd.RDD import org.bdgenomics.adam.instrumentation.Timers._ -import org.bdgenomics.adam.models.{ - RecordGroupDictionary, - ReferencePosition -} +import org.bdgenomics.adam.models.{ RecordGroupDictionary, ReferencePosition } import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.fragment.FragmentDataset -import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment } +import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment, Strand } private[rdd] object MarkDuplicates extends Serializable with Logging { @@ -96,19 +93,24 @@ private[rdd] object MarkDuplicates extends Serializable with Logging { recordGroups: RecordGroupDictionary): RDD[SingleReadBucket] = { checkRecordGroups(recordGroups) + def positionForStrand(defaultPos: Option[ReferencePosition], otherPos: Option[ReferencePosition], strand: Strand) = { + defaultPos.filter(_.strand == strand).orElse(otherPos.filter(_.strand == strand).orElse(defaultPos)) + } + // Group by library and left position def leftPositionAndLibrary(p: (ReferencePositionPair, SingleReadBucket), rgd: RecordGroupDictionary): (Option[ReferencePosition], String) = { + val leftPosition = positionForStrand(p._1.read1refPos, p._1.read2refPos, Strand.FORWARD) if (p._2.allReads.head.getRecordGroupName != null) { - (p._1.read1refPos, rgd(p._2.allReads.head.getRecordGroupName).library.getOrElse(null)) + (leftPosition, rgd(p._2.allReads.head.getRecordGroupName).library.orNull) } else { - (p._1.read1refPos, null) + (leftPosition, null) } } // Group by right position def rightPosition(p: (ReferencePositionPair, SingleReadBucket)): Option[ReferencePosition] = { - p._1.read2refPos + positionForStrand(p._1.read2refPos, p._1.read1refPos, Strand.REVERSE) } rdd.keyBy(ReferencePositionPair(_)) @@ -166,4 +168,5 @@ private[rdd] object MarkDuplicates extends Serializable with Logging { scoreBucket(x._2) - scoreBucket(y._2) } } + } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala index d8db373be0..37f02e455c 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/MarkDuplicatesSuite.scala @@ -309,4 +309,17 @@ class MarkDuplicatesSuite extends ADAMFunSuite { assert(nonDups.size == 2 && nonDups.forall(p => p.getReadName.toString == "best")) assert(dups.forall(p => p.getReadName.startsWith("poor"))) } + + sparkTest("inverse pairs") { + val firstPair = createPair("0", 100, 251, "0", 1100, 1251, "pair1") + val secondPair = createPair("0", 1100, 1251, "0", 100, 251, "pair2") + secondPair.head.setReadNegativeStrand(true) + secondPair.head.setMateNegativeStrand(false) + secondPair(1).setReadNegativeStrand(false) + secondPair(1).setMateNegativeStrand(true) + + val marked = markDuplicateFragments(firstPair ++ secondPair: _*) + val (dups, nonDups) = marked.partition(_.getDuplicateRead) + assert(dups.size == 2) + } }