diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/RecordGroupDictionary.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/RecordGroupDictionary.scala index 1124f5ba31..70a7a46439 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/RecordGroupDictionary.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/RecordGroupDictionary.scala @@ -53,7 +53,7 @@ object RecordGroupDictionary { * * @param recordGroups A seq of record groups to populate the dictionary. * - * @throws IllegalArgumentError Throws an assertion error if there are multiple record + * @throws IllegalArgumentException Throws an assertion error if there are multiple record * groups with the same name. */ case class RecordGroupDictionary(recordGroups: Seq[RecordGroup]) { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala index c4480d6c1b..444de80386 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala @@ -64,13 +64,11 @@ private[rdd] object MarkDuplicates extends Serializable with Logging { } def apply(rdd: AlignmentRecordRDD): RDD[AlignmentRecord] = { - markBuckets(rdd.groupReadsByFragment(), rdd.recordGroups) .flatMap(_.allReads) } def apply(rdd: FragmentRDD): RDD[Fragment] = { - markBuckets(rdd.rdd.map(f => SingleReadBucket(f)), rdd.recordGroups) .map(_.toFragment) } @@ -96,14 +94,20 @@ private[rdd] object MarkDuplicates extends Serializable with Logging { recordGroups: RecordGroupDictionary): RDD[SingleReadBucket] = { checkRecordGroups(recordGroups) + // Gets the library of a SingleReadBucket + def getLibrary(singleReadBucket: SingleReadBucket): Option[String] = { + val recordGroupName = singleReadBucket.allReads.head.getRecordGroupName + recordGroups.recordGroupMap.get(recordGroupName).flatMap(v => { + val (recordGroup, index) = v + recordGroup.library + }) + } + // Group by library and left position def leftPositionAndLibrary(p: (ReferencePositionPair, SingleReadBucket), - rgd: RecordGroupDictionary): (Option[ReferencePosition], String) = { - if (p._2.allReads.head.getRecordGroupName != null) { - (p._1.read1refPos, rgd(p._2.allReads.head.getRecordGroupName).library.getOrElse(null)) - } else { - (p._1.read1refPos, null) - } + rgd: RecordGroupDictionary): (Option[ReferencePosition], Option[String]) = { + val (refPosPair, singleReadBucket) = p + (refPosPair.read1refPos, getLibrary(singleReadBucket)) } // Group by right position