diff --git a/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala index 73bc1a3af5..b83a5dc370 100644 --- a/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala +++ b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala @@ -16,10 +16,10 @@ package edu.berkeley.cs.amplab.adam.rdd import org.apache.spark.rdd.RDD -import edu.berkeley.cs.amplab.adam.avro.{ADAMPileup, Base} +import edu.berkeley.cs.amplab.adam.avro.{ADAMRecord, ADAMPileup, Base} import edu.berkeley.cs.amplab.adam.util.SparkFunSuite import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ -import edu.berkeley.cs.amplab.adam.models.ADAMRod +import scala.util.Random class AdamRDDFunctionsSuite extends SparkFunSuite { @@ -163,4 +163,26 @@ class AdamRDDFunctionsSuite extends SparkFunSuite { assert(coverage > 1.99 && coverage < 2.01) } + sparkTest("sorting reads") { + val random = new Random("sorting".hashCode) + val numReadsToCreate = 1000 + val reads = for (i <- 0 until numReadsToCreate) yield { + val mapped = random.nextBoolean() + val builder = ADAMRecord.newBuilder().setReadMapped(mapped) + if (mapped) { + builder.setReferenceId(random.nextInt(numReadsToCreate / 10)).setStart(random.nextInt(1000000)) + } + builder.build() + } + val rdd = sc.parallelize(reads) + val sortedReads = rdd.adamSortReadsByReferencePosition().collect().zipWithIndex + val (mapped, unmapped) = sortedReads.partition(_._1.getReadMapped) + // Make sure that all the unmapped reads are placed at the end + assert(unmapped.forall(p => p._2 > mapped.takeRight(1)(0)._2)) + // Make sure that we appropriately sorted the reads + val expectedSortedReads = mapped.sortWith( + (a, b) => a._1.getReferenceId < b._1.getReferenceId && a._1.getStart < b._1.getStart) + assert(expectedSortedReads === mapped) + } + }