diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala index c940319179..fe8d033c83 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala @@ -63,7 +63,6 @@ object ADAMMain extends Logging { PrintADAM, PrintGenes, FlagStat, - VizReads, PrintTags, ListDict, SummarizeGenotypes, diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala index f19a3c5179..9a7feb64e2 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala @@ -28,7 +28,6 @@ import org.bdgenomics.adam.projections.Projection import org.bdgenomics.adam.projections.AlignmentRecordField._ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.BroadcastRegionJoin -import org.bdgenomics.adam.rich.ReferenceMappingContext._ import org.bdgenomics.formats.avro.AlignmentRecord import scala.io._ @@ -90,8 +89,8 @@ class CalculateDepth(protected val args: CalculateDepthArgs) extends ADAMSparkCo val variantNames = vcf.collect().toMap val joinedRDD: RDD[(ReferenceRegion, AlignmentRecord)] = - if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions, mappedRDD) - else BroadcastRegionJoin.partitionAndJoin(sc, variantPositions, mappedRDD) + if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) + else BroadcastRegionJoin.partitionAndJoin(sc, variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) val depths: RDD[(ReferenceRegion, Int)] = joinedRDD.map { case (region, record) => (region, 1) }.reduceByKey(_ + _).sortByKey() diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/VizReads.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/VizReads.scala deleted file mode 100644 index edf4bffbf4..0000000000 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/VizReads.scala +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.cli - -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkContext -import org.bdgenomics.adam.models.VariantContext -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.{ OrderedTrackedLayout, ReferenceRegion } -import org.bdgenomics.adam.projections.AlignmentRecordField._ -import org.bdgenomics.adam.projections.Projection -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rich.ReferenceMappingContext._ -import org.bdgenomics.formats.avro.{ AlignmentRecord, Genotype, GenotypeAllele } -import org.fusesource.scalate.TemplateEngine -import org.json4s._ -import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -import org.scalatra.json._ -import org.scalatra.ScalatraServlet - -object VizReads extends ADAMCommandCompanion { - val commandName: String = "viz" - val commandDescription: String = "Generates images from sections of the genome" - - var refName = "" - var inputPath = "" - var reads: RDD[AlignmentRecord] = null - var variants: RDD[Genotype] = null - - val trackHeight = 10 - val width = 1200 - val height = 400 - val base = 50 - - def apply(cmdLine: Array[String]): ADAMCommand = { - new VizReads(Args4j[VizReadsArgs](cmdLine)) - } - - def printTrackJson(layout: OrderedTrackedLayout[AlignmentRecord]): List[TrackJson] = { - var tracks = new scala.collection.mutable.ListBuffer[TrackJson] - - // draws a box for each read, in the appropriate track - for ((rec, track) <- layout.trackAssignments) { - val aRec = rec.asInstanceOf[AlignmentRecord] - tracks += new TrackJson(aRec.getReadName, aRec.getStart, aRec.getEnd, track) - } - tracks.toList - } - - def printVariationJson(layout: OrderedTrackedLayout[Genotype]): List[VariationJson] = { - var tracks = new scala.collection.mutable.ListBuffer[VariationJson] - - for ((rec, track) <- layout.trackAssignments) { - val aRec = rec.asInstanceOf[Genotype] - val referenceAllele = aRec.getVariant.getReferenceAllele - val alternateAllele = aRec.getVariant.getAlternateAllele - tracks += new VariationJson(aRec.getVariant.getContig.getContigName, aRec.getAlleles.mkString(" / "), aRec.getVariant.getStart, aRec.getVariant.getEnd, track) - - } - tracks.toList - } - - def printJsonFreq(array: Array[AlignmentRecord], region: ReferenceRegion): List[FreqJson] = { - val freqMap = new java.util.TreeMap[Long, Long] - var freqBuffer = new scala.collection.mutable.ListBuffer[FreqJson] - - // initiates map with 0 values - var i0 = region.start - while (i0 <= region.end) { - freqMap.put(i0, 0) - i0 += 1 - } - - // creates a point for each base showing its frequency - for (rec <- array) { - val aRec = rec.asInstanceOf[AlignmentRecord] - var i = aRec.getStart - while (i <= aRec.getEnd) { - if (i >= region.start && i <= region.end) - freqMap.put(i, freqMap(i) + 1) - i += 1 - } - } - - // convert to list of FreqJsons - val iter = freqMap.keySet.iterator - var key = 0L - while (iter.hasNext) { - key = iter.next() - freqBuffer += FreqJson(key, freqMap(key)) - } - - freqBuffer.toList - } -} - -case class TrackJson(readName: String, start: Long, end: Long, track: Long) -case class VariationJson(contigName: String, alleles: String, start: Long, end: Long, track: Long) -case class FreqJson(base: Long, freq: Long) - -class VizReadsArgs extends Args4jBase with ParquetArgs { - @Argument(required = true, metaVar = "INPUT", usage = "The ADAM Records file to view", index = 0) - var inputPath: String = null - - @Argument(required = true, metaVar = "REFNAME", usage = "The reference to view", index = 1) - var refName: String = null - - @Args4jOption(required = false, name = "-port", usage = "The port to bind to for visualization. The default is 8080.") - var port: Int = 8080 -} - -class VizServlet extends ScalatraServlet with JacksonJsonSupport { - protected implicit val jsonFormats: Formats = DefaultFormats - var regInfo = ReferenceRegion(VizReads.refName, 0, 200) - var filteredLayout: OrderedTrackedLayout[AlignmentRecord] = null - var filteredArray: Array[AlignmentRecord] = null - - get("/?") { - redirect(url("reads")) - } - - get("/reads/?") { - contentType = "text/html" - - filteredLayout = new OrderedTrackedLayout(VizReads.reads.filterByOverlappingRegion(regInfo).collect()) - val templateEngine = new TemplateEngine - templateEngine.layout("adam-cli/src/main/webapp/WEB-INF/layouts/reads.ssp", - Map("regInfo" -> (regInfo.referenceName, regInfo.start.toString, regInfo.end.toString), - "width" -> VizReads.width.toString, - "base" -> VizReads.base.toString, - "numTracks" -> filteredLayout.numTracks.toString, - "trackHeight" -> VizReads.trackHeight.toString)) - } - - get("/reads/:ref") { - contentType = formats("json") - - regInfo = ReferenceRegion(params("ref"), params("start").toLong, params("end").toLong) - filteredLayout = new OrderedTrackedLayout(VizReads.reads.filterByOverlappingRegion(regInfo).collect()) - VizReads.printTrackJson(filteredLayout) - } - - get("/freq") { - contentType = "text/html" - - filteredArray = VizReads.reads.filterByOverlappingRegion(regInfo).collect() - val templateEngine = new TemplateEngine - templateEngine.layout("adam-cli/src/main/webapp/WEB-INF/layouts/freq.ssp", - Map("regInfo" -> (regInfo.referenceName, regInfo.start.toString, regInfo.end.toString), - "width" -> VizReads.width.toString, - "height" -> VizReads.height.toString, - "base" -> VizReads.base.toString)) - } - - get("/freq/:ref") { - contentType = formats("json") - - regInfo = ReferenceRegion(params("ref"), params("start").toLong, params("end").toLong) - filteredArray = VizReads.reads.filterByOverlappingRegion(regInfo).collect() - VizReads.printJsonFreq(filteredArray, regInfo) - } - - get("/variants/?") { - contentType = "text/html" - - val input = VizReads.variants.filterByOverlappingRegion(regInfo).collect() - val filteredGenotypeTrack = new OrderedTrackedLayout(input) - val templateEngine = new TemplateEngine - val displayMap = Map("regInfo" -> (regInfo.referenceName, regInfo.start.toString, regInfo.end.toString), - "width" -> VizReads.width.toString, - "base" -> VizReads.base.toString, - "numTracks" -> filteredGenotypeTrack.numTracks.toString, - "trackHeight" -> VizReads.trackHeight.toString) - templateEngine.layout("adam-cli/src/main/webapp/WEB-INF/layouts/variants.ssp", - displayMap) - } - - get("/variants/:ref") { - contentType = formats("json") - - regInfo = ReferenceRegion(params("ref"), params("start").toLong, params("end").toLong) - val input = VizReads.variants.filterByOverlappingRegion(regInfo).collect() - val filteredGenotypeTrack = new OrderedTrackedLayout(input) - VizReads.printVariationJson(filteredGenotypeTrack) - } -} - -class VizReads(protected val args: VizReadsArgs) extends ADAMSparkCommand[VizReadsArgs] { - val companion: ADAMCommandCompanion = VizReads - - def run(sc: SparkContext, job: Job): Unit = { - VizReads.refName = args.refName - - val proj = Projection(contig, readMapped, readName, start, end) - - if (args.inputPath.endsWith(".bam") || args.inputPath.endsWith(".sam") || args.inputPath.endsWith(".align.adam")) { - VizReads.reads = sc.loadAlignments(args.inputPath, projection = Some(proj)) - } - - if (args.inputPath.endsWith(".vcf") || args.inputPath.endsWith(".gt.adam")) { - VizReads.variants = sc.loadGenotypes(args.inputPath, projection = Some(proj)) - } - - val server = new org.eclipse.jetty.server.Server(args.port) - val handlers = new org.eclipse.jetty.server.handler.ContextHandlerCollection() - server.setHandler(handlers) - handlers.addHandler(new org.eclipse.jetty.webapp.WebAppContext("adam-cli/src/main/webapp", "/")) - server.start() - println("View the visualization at: " + args.port) - println("Frequency visualization at: /freq") - println("Overlapping reads visualization at: /reads") - println("Variant visualization at: /variants") - server.join() - } -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceMapping.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceMapping.scala deleted file mode 100644 index 80b8364bc7..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceMapping.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.models - -trait ReferenceMapping[T] { - def getReferenceName(value: T): String - def getReferenceRegion(value: T): ReferenceRegion -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/TrackedLayout.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/TrackedLayout.scala deleted file mode 100644 index 654ddb01c3..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/TrackedLayout.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.models - -import scala.collection.mutable - -/** - * A TrackedLayout is an assignment of values of some type T (which presumably are mappable to - * a reference genome or other linear coordinate space) to 'tracks' -- that is, to integers, - * with the guarantee that no two values assigned to the same track will overlap. - * - * This is the kind of data structure which is required for non-overlapping genome visualization. - * - * @tparam T the type of value which is to be tracked. - */ -trait TrackedLayout[T] { - def numTracks: Int - def trackAssignments: Map[T, Int] -} - -object TrackedLayout { - - def overlaps[T](rec1: T, rec2: T)(implicit rm: ReferenceMapping[T]): Boolean = { - val ref1 = rm.getReferenceRegion(rec1) - val ref2 = rm.getReferenceRegion(rec2) - ref1.overlaps(ref2) - } -} - -/** - * An implementation of TrackedLayout which takes a sequence of ReferenceMappable values, - * and lays them out in order (i.e. from first-to-last) in the naive way: for each - * value, it looks for the track with the lowest index that doesn't already have an overlapping - * value, and it doesn't use any special data structures (it just does a linear search for each - * track.) - * - * @param reads The set of values (i.e. "reads", but anything that is mappable to the reference - * genome ultimately) to lay out in tracks - * @param mapping The (implicit) reference mapping which converts values to ReferenceRegions - * @tparam T the type of value which is to be tracked. - */ -class OrderedTrackedLayout[T](reads: Traversable[T])(implicit val mapping: ReferenceMapping[T]) extends TrackedLayout[T] { - - private var trackBuilder = new mutable.ListBuffer[Track]() - reads.toSeq.foreach(findAndAddToTrack) - trackBuilder = trackBuilder.filter(_.records.nonEmpty) - - val numTracks = trackBuilder.size - val trackAssignments: Map[T, Int] = - Map(trackBuilder.toList.zip(0 to numTracks).flatMap { - case (track: Track, idx: Int) => track.records.map(_ -> idx) - }: _*) - - private def findAndAddToTrack(rec: T) { - val reg = mapping.getReferenceRegion(rec) - if (reg != null) { - val track: Option[Track] = trackBuilder.find(track => !track.conflicts(rec)) - track.map(_ += rec).getOrElse(addTrack(new Track(rec))) - } - } - - private def addTrack(t: Track): Track = { - trackBuilder += t - t - } - - private class Track(val initial: T) { - - val records = new mutable.ListBuffer[T]() - records += initial - - def +=(rec: T): Track = { - records += rec - this - } - - def conflicts(rec: T): Boolean = - records.exists(r => TrackedLayout.overlaps(r, rec)) - } - -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala index 198b1e78fc..23bc27e0a4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala @@ -17,7 +17,7 @@ */ package org.bdgenomics.adam.rdd -import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceMapping, ReferenceRegion } +import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion } import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.Predef._ @@ -53,12 +53,8 @@ object BroadcastRegionJoin { * operation. The result is the region-join. * * @param sc A SparkContext for the cluster that will perform the join - * @param baseRDD The 'left' side of the join, a set of values which correspond (through an implicit - * ReferenceMapping) to regions on the genome. - * @param joinedRDD The 'right' side of the join, a set of values which correspond (through an implicit - * ReferenceMapping) to regions on the genome - * @param tMapping implicit reference mapping for baseRDD regions - * @param uMapping implicit reference mapping for joinedRDD regions + * @param baseRDD The 'left' side of the join + * @param joinedRDD The 'right' side of the join * @param tManifest implicit type of baseRDD * @param uManifest implicit type of joinedRDD * @tparam T type of baseRDD @@ -67,11 +63,9 @@ object BroadcastRegionJoin { * corresponding to x overlaps the region corresponding to y. */ def partitionAndJoin[T, U](sc: SparkContext, - baseRDD: RDD[T], - joinedRDD: RDD[U])(implicit tMapping: ReferenceMapping[T], - uMapping: ReferenceMapping[U], - tManifest: ClassTag[T], - uManifest: ClassTag[U]): RDD[(T, U)] = { + baseRDD: RDD[(ReferenceRegion, T)], + joinedRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T], + uManifest: ClassTag[U]): RDD[(T, U)] = { /** * Original Join Design: @@ -100,7 +94,7 @@ object BroadcastRegionJoin { // and collect them. val collectedLeft: Seq[(String, Iterable[ReferenceRegion])] = baseRDD - .map(t => (tMapping.getReferenceName(t), tMapping.getReferenceRegion(t))) // RDD[(String,ReferenceRegion)] + .map(kv => (kv._1.referenceName, kv._1)) // RDD[(String,ReferenceRegion)] .groupBy(_._1) // RDD[(String,Seq[(String,ReferenceRegion)])] .map(t => (t._1, t._2.map(_._2))) // RDD[(String,Seq[ReferenceRegion])] .collect() // Iterable[(String,Seq[ReferenceRegion])] @@ -115,26 +109,26 @@ object BroadcastRegionJoin { val regions = sc.broadcast(multiNonOverlapping) // each element of the left-side RDD should have exactly one partition. - val smallerKeyed: RDD[(ReferenceRegion, T)] = - baseRDD.keyBy(t => regions.value.regionsFor(t).head) + val smallerKeyed: RDD[(ReferenceRegion, (ReferenceRegion, T))] = + baseRDD.map(t => (regions.value.regionsFor(t).head, t)) // each element of the right-side RDD may have 0, 1, or more than 1 corresponding partition. - val largerKeyed: RDD[(ReferenceRegion, U)] = + val largerKeyed: RDD[(ReferenceRegion, (ReferenceRegion, U))] = joinedRDD.filter(regions.value.filter(_)) .flatMap(t => regions.value.regionsFor(t).map((r: ReferenceRegion) => (r, t))) // this is (essentially) performing a cartesian product within each partition... - val joined: RDD[(ReferenceRegion, (T, U))] = + val joined: RDD[(ReferenceRegion, ((ReferenceRegion, T), (ReferenceRegion, U)))] = smallerKeyed.join(largerKeyed) // ... so we need to filter the final pairs to make sure they're overlapping. - val filtered: RDD[(ReferenceRegion, (T, U))] = joined.filter({ - case (rr: ReferenceRegion, (t: T, u: U)) => - tMapping.getReferenceRegion(t).overlaps(uMapping.getReferenceRegion(u)) + val filtered: RDD[(ReferenceRegion, ((ReferenceRegion, T), (ReferenceRegion, U)))] = joined.filter(kv => { + val (rr: ReferenceRegion, (t: (ReferenceRegion, T), u: (ReferenceRegion, U))) = kv + t._1.overlaps(u._1) }) // finally, erase the partition key and return the result. - filtered.map(rrtu => rrtu._2) + filtered.map(rrtu => (rrtu._2._1._2, rrtu._2._2._2)) } /** @@ -146,15 +140,13 @@ object BroadcastRegionJoin { * realistic sized sets. * */ - def cartesianFilter[T, U](baseRDD: RDD[T], - joinedRDD: RDD[U])(implicit tMapping: ReferenceMapping[T], - uMapping: ReferenceMapping[U], - tManifest: ClassTag[T], - uManifest: ClassTag[U]): RDD[(T, U)] = { + def cartesianFilter[T, U](baseRDD: RDD[(ReferenceRegion, T)], + joinedRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T], + uManifest: ClassTag[U]): RDD[(T, U)] = { baseRDD.cartesian(joinedRDD).filter({ - case (t: T, u: U) => - tMapping.getReferenceRegion(t).overlaps(uMapping.getReferenceRegion(u)) - }) + case (t: (ReferenceRegion, T), u: (ReferenceRegion, U)) => + t._1.overlaps(u._1) + }).map(p => (p._1._2, p._2._2)) } } @@ -285,8 +277,8 @@ class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializ * @return An Iterable[ReferenceRegion], where each element of the Iterable is a nonoverlapping-region * defined by 1 or more input-set regions. */ - def regionsFor[U](regionable: U)(implicit mapping: ReferenceMapping[U]): Iterable[ReferenceRegion] = - findOverlappingRegions(mapping.getReferenceRegion(regionable)) + def regionsFor[U](regionable: (ReferenceRegion, U)): Iterable[ReferenceRegion] = + findOverlappingRegions(regionable._1) /** * A quick filter, to find out if we even need to examine a particular input value for keying by @@ -299,9 +291,8 @@ class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializ * @return a boolean -- the input value should only participate in the regionJoin if the return value * here is 'true'. */ - def hasRegionsFor[U](regionable: U)(implicit mapping: ReferenceMapping[U]): Boolean = { - val region = mapping.getReferenceRegion(regionable) - !(region.end <= endpoints.head || region.start >= endpoints.last) + def hasRegionsFor[U](regionable: (ReferenceRegion, U)): Boolean = { + !(regionable._1.end <= endpoints.head || regionable._1.start >= endpoints.last) } override def toString: String = @@ -310,8 +301,8 @@ class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializ object NonoverlappingRegions { - def apply[T](values: Seq[T])(implicit refMapping: ReferenceMapping[T]) = - new NonoverlappingRegions(values.map(value => refMapping.getReferenceRegion(value))) + def apply[T](values: Seq[(ReferenceRegion, T)]) = + new NonoverlappingRegions(values.map(_._1)) def alternating[T](seq: Seq[T], includeFirst: Boolean): Seq[T] = { val inds = if (includeFirst) { 0 until seq.size } else { 1 until seq.size + 1 } @@ -340,23 +331,17 @@ class MultiContigNonoverlappingRegions( val regionMap: Map[String, NonoverlappingRegions] = Map(regions.map(r => (r._1, new NonoverlappingRegions(r._2))): _*) - def regionsFor[U](regionable: U)(implicit mapping: ReferenceMapping[U]): Iterable[ReferenceRegion] = - regionMap.get(mapping.getReferenceName(regionable)) match { - case None => Seq() - case Some(nr) => nr.regionsFor(regionable) - } + def regionsFor[U](regionable: (ReferenceRegion, U)): Iterable[ReferenceRegion] = + regionMap.get(regionable._1.referenceName).fold(Iterable[ReferenceRegion]())(_.regionsFor(regionable)) - def filter[U](value: U)(implicit mapping: ReferenceMapping[U]): Boolean = - regionMap.get(mapping.getReferenceName(value)) match { - case None => false - case Some(nr) => nr.hasRegionsFor(value) - } + def filter[U](value: (ReferenceRegion, U)): Boolean = + regionMap.get(value._1.referenceName).fold(false)(_.hasRegionsFor(value)) } object MultiContigNonoverlappingRegions { - def apply[T](values: Seq[T])(implicit mapping: ReferenceMapping[T]): MultiContigNonoverlappingRegions = { + def apply[T](values: Seq[(ReferenceRegion, T)]): MultiContigNonoverlappingRegions = { new MultiContigNonoverlappingRegions( - values.map(v => (mapping.getReferenceName(v), mapping.getReferenceRegion(v))) + values.map(kv => (kv._1.referenceName, kv._1)) .groupBy(t => t._1) .map(t => (t._1, t._2.map(k => k._2))) .toSeq) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicPartitioners.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicPartitioners.scala index 38206f2065..5f2632a1b0 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicPartitioners.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicPartitioners.scala @@ -17,7 +17,7 @@ */ package org.bdgenomics.adam.rdd -import org.bdgenomics.adam.models.{ ReferenceRegion, ReferenceMapping, ReferencePosition, SequenceDictionary } +import org.bdgenomics.adam.models.{ ReferenceRegion, ReferencePosition, SequenceDictionary } import org.apache.spark.{ Logging, Partitioner } import scala.math._ @@ -99,16 +99,12 @@ object GenomicPositionPartitioner { Map(seqDict.records.toSeq.map(rec => (rec.name.toString, rec.length)): _*) } -case class GenomicRegionPartitioner[T: ReferenceMapping](partitionSize: Long, seqLengths: Map[String, Long], start: Boolean = true) extends Partitioner with Logging { +case class GenomicRegionPartitioner(partitionSize: Long, seqLengths: Map[String, Long], start: Boolean = true) extends Partitioner with Logging { private val names: Seq[String] = seqLengths.keys.toSeq.sortWith(_ < _) private val lengths: Seq[Long] = names.map(seqLengths(_)) private val parts: Seq[Int] = lengths.map(v => round(ceil(v.toDouble / partitionSize)).toInt) private val cumulParts: Map[String, Int] = Map(names.zip(parts.scan(0)(_ + _)): _*) - private def extractReferenceRegion(k: T)(implicit tMapping: ReferenceMapping[T]): ReferenceRegion = { - tMapping.getReferenceRegion(k) - } - private def computePartition(refReg: ReferenceRegion): Int = { val pos = if (start) refReg.start else (refReg.end - 1) (cumulParts(refReg.referenceName) + pos / partitionSize).toInt @@ -118,13 +114,13 @@ case class GenomicRegionPartitioner[T: ReferenceMapping](partitionSize: Long, se override def getPartition(key: Any): Int = { key match { - case mappable: T => computePartition(extractReferenceRegion(mappable)) - case _ => throw new IllegalArgumentException("Only ReferenceMappable values can be partitioned by GenomicRegionPartitioner") + case region: ReferenceRegion => computePartition(region) + case _ => throw new IllegalArgumentException("Only ReferenceMappable values can be partitioned by GenomicRegionPartitioner") } } } object GenomicRegionPartitioner { - def apply[T: ReferenceMapping](partitionSize: Long, seqDict: SequenceDictionary): GenomicRegionPartitioner[T] = + def apply(partitionSize: Long, seqDict: SequenceDictionary): GenomicRegionPartitioner = GenomicRegionPartitioner(partitionSize, GenomicPositionPartitioner.extractLengthMap(seqDict)) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala index c450a25dbc..35c4a8fe2c 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala @@ -21,8 +21,7 @@ package org.bdgenomics.adam.rdd import org.apache.spark.{ Logging, Partitioner, SparkContext } import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion, ReferenceMapping } - +import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion } import scala.collection.mutable.ListBuffer import scala.math._ import scala.reflect.ClassTag @@ -40,15 +39,11 @@ object ShuffleRegionJoin { * the object in each bin. Finally, each bin independently performs a chromsweep sort-merge join. * * @param sc A SparkContext for the cluster that will perform the join - * @param leftRDD The 'left' side of the join, a set of values which correspond (through an implicit - * ReferenceMapping) to regions on the genome. - * @param rightRDD The 'right' side of the join, a set of values which correspond (through an implicit - * ReferenceMapping) to regions on the genome + * @param leftRDD The 'left' side of the join + * @param rightRDD The 'right' side of the join * @param seqDict A SequenceDictionary -- every region corresponding to either the leftRDD or rightRDD * values must be mapped to a chromosome with an entry in this dictionary. * @param partitionSize The size of the genome bin in nucleotides. Controls the parallelism of the join. - * @param tMapping implicit reference mapping for leftRDD regions - * @param uMapping implicit reference mapping for rightRDD regions * @param tManifest implicit type of leftRDD * @param uManifest implicit type of rightRDD * @tparam T type of leftRDD @@ -57,12 +52,10 @@ object ShuffleRegionJoin { * corresponding to x overlaps the region corresponding to y. */ def partitionAndJoin[T, U](sc: SparkContext, - leftRDD: RDD[T], - rightRDD: RDD[U], + leftRDD: RDD[(ReferenceRegion, T)], + rightRDD: RDD[(ReferenceRegion, U)], seqDict: SequenceDictionary, - partitionSize: Long)(implicit tMapping: ReferenceMapping[T], - uMapping: ReferenceMapping[U], - tManifest: ClassTag[T], + partitionSize: Long)(implicit tManifest: ClassTag[T], uManifest: ClassTag[U]): RDD[(T, U)] = { // Create the set of bins across the genome for parallel processing val seqLengths = Map(seqDict.records.toSeq.map(rec => (rec.name.toString, rec.length)): _*) @@ -71,15 +64,15 @@ object ShuffleRegionJoin { // Key each RDD element to its corresponding bin // Elements may be replicated if they overlap multiple bins val keyedLeft: RDD[((ReferenceRegion, Int), T)] = - leftRDD.flatMap(x => { - val region = tMapping.getReferenceRegion(x) + leftRDD.flatMap(kv => { + val (region, x) = kv val lo = bins.value.getStartBin(region) val hi = bins.value.getEndBin(region) (lo to hi).map(i => ((region, i), x)) }) val keyedRight: RDD[((ReferenceRegion, Int), U)] = - rightRDD.flatMap(y => { - val region = uMapping.getReferenceRegion(y) + rightRDD.flatMap(kv => { + val (region, y) = kv val lo = bins.value.getStartBin(region) val hi = bins.value.getEndBin(region) (lo to hi).map(i => ((region, i), y)) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/GeneFeatureRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/GeneFeatureRDDFunctions.scala index 4f8d323e55..5046a2551b 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/GeneFeatureRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/GeneFeatureRDDFunctions.scala @@ -21,7 +21,6 @@ import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.bdgenomics.adam.models._ -import org.bdgenomics.adam.rich.ReferenceMappingContext.FeatureReferenceMapping import org.bdgenomics.formats.avro.{ Strand, Feature } import scala.collection.JavaConversions._ @@ -71,7 +70,7 @@ class GeneFeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable wit case ("exon", ftr: Feature) => val ids: Seq[String] = ftr.getParentIds.map(_.toString) ids.map(transcriptId => (transcriptId, - Exon(ftr.getFeatureId.toString, transcriptId, strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr)))) + Exon(ftr.getFeatureId.toString, transcriptId, strand(ftr.getStrand), ReferenceRegion(ftr)))) }.groupByKey() val cdsByTranscript: RDD[(String, Iterable[CDS])] = @@ -79,7 +78,7 @@ class GeneFeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable wit case ("CDS", ftr: Feature) => val ids: Seq[String] = ftr.getParentIds.map(_.toString) ids.map(transcriptId => (transcriptId, - CDS(transcriptId, strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr)))) + CDS(transcriptId, strand(ftr.getStrand), ReferenceRegion(ftr)))) }.groupByKey() val utrsByTranscript: RDD[(String, Iterable[UTR])] = @@ -87,7 +86,7 @@ class GeneFeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable wit case ("UTR", ftr: Feature) => val ids: Seq[String] = ftr.getParentIds.map(_.toString) ids.map(transcriptId => (transcriptId, - UTR(transcriptId, strand(ftr.getStrand), FeatureReferenceMapping.getReferenceRegion(ftr)))) + UTR(transcriptId, strand(ftr.getStrand), ReferenceRegion(ftr)))) }.groupByKey() // Step #3 diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceMappingContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceMappingContext.scala deleted file mode 100644 index de615ee77b..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceMappingContext.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rich - -import org.bdgenomics.adam.models.{ ReferenceRegion, ReferenceMapping } -import org.bdgenomics.formats.avro.{ AlignmentRecord, Feature, Genotype } - -/** - * A common location in which to drop some ReferenceMapping implementations. - */ -object ReferenceMappingContext { - - implicit object GenotypeReferenceMapping extends ReferenceMapping[Genotype] with Serializable { - override def getReferenceName(value: Genotype): String = value.getVariant.getContig.getContigName.toString - override def getReferenceRegion(value: Genotype): ReferenceRegion = - ReferenceRegion(value.getVariant.getContig.getContigName.toString, value.getVariant.getStart, value.getVariant.getEnd) - } - - implicit object AlignmentRecordReferenceMapping extends ReferenceMapping[AlignmentRecord] with Serializable { - override def getReferenceName(value: AlignmentRecord): String = value.getContig.getContigName.toString - override def getReferenceRegion(value: AlignmentRecord): ReferenceRegion = ReferenceRegion(value).orNull - } - - implicit object ReferenceRegionReferenceMapping extends ReferenceMapping[ReferenceRegion] with Serializable { - override def getReferenceName(value: ReferenceRegion): String = value.referenceName.toString - override def getReferenceRegion(value: ReferenceRegion): ReferenceRegion = value - } - - implicit object FeatureReferenceMapping extends ReferenceMapping[Feature] with Serializable { - override def getReferenceName(value: Feature): String = value.getContig.getContigName.toString - override def getReferenceRegion(value: Feature): ReferenceRegion = ReferenceRegion(value) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/TrackedLayoutSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/TrackedLayoutSuite.scala deleted file mode 100644 index 5c63d9e645..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/models/TrackedLayoutSuite.scala +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.models - -import org.bdgenomics.adam.rich.ReferenceMappingContext._ -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } -import org.scalatest.FunSuite - -class TrackedLayoutSuite extends FunSuite { - - def rec(contig: String = null, start: Int = 0, cigar: String = null, end: Int = 0, readMapped: Boolean = true, firstOfPair: Boolean = true): AlignmentRecord = { - val c = Contig.newBuilder().setContigName(contig).build() - - AlignmentRecord.newBuilder() - .setContig(c) - .setReadMapped(readMapped) - .setFirstOfPair(firstOfPair) - .setStart(start) - .setCigar(cigar) - .setEnd(end) - .build() - } - - test("OrderedTrackedLayout lays out no records into zero tracks") { - val layout = new OrderedTrackedLayout[AlignmentRecord](Seq()) - assert(layout.numTracks === 0) - } - - test("OrderedTrackedLayout lays out AlignmentRecords left-to-right in the order they're passed") { - val (r1, r2, r3, r4) = ( - rec("chr1", 100, "100M", 200), - rec("chr1", 150, "100M", 250), - rec("chr1", 200, "100M", 300), - rec("chr2", 200, "100M", 300)) - - val recs = Seq(r1, r2, r3, r4) - val layout: OrderedTrackedLayout[AlignmentRecord] = new OrderedTrackedLayout(recs) - - // Just making sure... r4 shouldn't overlap r1-3 - val rm: ReferenceMapping[AlignmentRecord] = AlignmentRecordReferenceMapping - val refs123 = Seq(r1, r2, r3).map(rm.getReferenceRegion) - assert(!refs123.exists(rm.getReferenceRegion(r4).overlaps)) - - // Now, test the correct track assignments of each record - assert(layout.trackAssignments.get(r1) === Some(0)) - assert(layout.trackAssignments.get(r2) === Some(1)) - assert(layout.trackAssignments.get(r3) === Some(0)) - assert(layout.trackAssignments.get(r4) === Some(0)) - assert(layout.numTracks === 2) - } - - test("OrderedTrackedLayout can handle unaligned reads") { - val (r1, r2, r3, r4) = ( - rec("chr1", 100, "100M", 200), - rec("chr1", 150, "100M", 250), - rec("chr1", 200, "100M", 300), - rec(readMapped = false)) - - val recs = Seq(r1, r2, r3, r4) - val layout: OrderedTrackedLayout[AlignmentRecord] = new OrderedTrackedLayout(recs) - - // Just making sure... r4 shouldn't overlap r1-3 - val rm: ReferenceMapping[AlignmentRecord] = AlignmentRecordReferenceMapping - val refs123 = Seq(r1, r2, r3).map(rm.getReferenceRegion) - - // Now, test the correct track assignments of each record - assert(layout.trackAssignments.get(r1) === Some(0)) - assert(layout.trackAssignments.get(r2) === Some(1)) - assert(layout.trackAssignments.get(r3) === Some(0)) - assert(layout.trackAssignments.get(r4) === None) - assert(layout.numTracks === 2) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala index d5706f1751..3ce6d6af3a 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala @@ -18,8 +18,7 @@ package org.bdgenomics.adam.rdd import org.apache.spark.SparkContext._ -import org.bdgenomics.adam.models.{ ReferenceMapping, ReferenceRegion, SequenceDictionary, SequenceRecord } -import org.bdgenomics.adam.rich.ReferenceMappingContext._ +import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary, SequenceRecord } import org.bdgenomics.adam.util.ADAMFunSuite import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } @@ -102,9 +101,9 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val record2 = AlignmentRecord.newBuilder(built).setStart(3L).setEnd(4L).build() val baseRecord = AlignmentRecord.newBuilder(built).setCigar("4M").setEnd(5L).build() - val baseMapping = new NonoverlappingRegions(Seq(AlignmentRecordReferenceMapping.getReferenceRegion(baseRecord))) - val regions1 = baseMapping.findOverlappingRegions(AlignmentRecordReferenceMapping.getReferenceRegion(record1)) - val regions2 = baseMapping.findOverlappingRegions(AlignmentRecordReferenceMapping.getReferenceRegion(record2)) + val baseMapping = new NonoverlappingRegions(Seq(ReferenceRegion(baseRecord).get)) + val regions1 = baseMapping.findOverlappingRegions(ReferenceRegion(record1).get) + val regions2 = baseMapping.findOverlappingRegions(ReferenceRegion(record2).get) assert(regions1.size === 1) assert(regions2.size === 1) assert(regions1.head === regions2.head) @@ -127,8 +126,8 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val record1 = builder.build() val record2 = builder.build() - val rdd1 = sc.parallelize(Seq(record1)) - val rdd2 = sc.parallelize(Seq(record2)) + val rdd1 = sc.parallelize(Seq(record1)).keyBy(ReferenceRegion(_).get) + val rdd2 = sc.parallelize(Seq(record2)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoinSuite.getReferenceRegion(record1) === BroadcastRegionJoinSuite.getReferenceRegion(record2)) @@ -168,8 +167,8 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val record2 = AlignmentRecord.newBuilder(built).setStart(3L).setEnd(4L).build() val baseRecord = AlignmentRecord.newBuilder(built).setCigar("4M").setEnd(5L).build() - val baseRdd = sc.parallelize(Seq(baseRecord)) - val recordsRdd = sc.parallelize(Seq(record1, record2)) + val baseRdd = sc.parallelize(Seq(baseRecord)).keyBy(ReferenceRegion(_).get) + val recordsRdd = sc.parallelize(Seq(record1, record2)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( sc, @@ -219,8 +218,8 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val baseRecord1 = AlignmentRecord.newBuilder(builtRef1).setCigar("4M").setEnd(5L).build() val baseRecord2 = AlignmentRecord.newBuilder(builtRef2).setCigar("4M").setEnd(5L).build() - val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)) - val recordsRdd = sc.parallelize(Seq(record1, record2, record3)) + val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)).keyBy(ReferenceRegion(_).get) + val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( sc, @@ -270,8 +269,8 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val baseRecord1 = AlignmentRecord.newBuilder(builtRef1).setCigar("4M").setEnd(5L).build() val baseRecord2 = AlignmentRecord.newBuilder(builtRef2).setCigar("4M").setEnd(5L).build() - val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)) - val recordsRdd = sc.parallelize(Seq(record1, record2, record3)) + val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)).keyBy(ReferenceRegion(_).get) + val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoin.cartesianFilter( baseRdd, @@ -293,8 +292,8 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { } object BroadcastRegionJoinSuite { - def getReferenceRegion[T](record: T)(implicit mapping: ReferenceMapping[T]): ReferenceRegion = - mapping.getReferenceRegion(record) + def getReferenceRegion(record: AlignmentRecord): ReferenceRegion = + ReferenceRegion(record).get def merge(prev: Boolean, next: (AlignmentRecord, AlignmentRecord)): Boolean = prev && getReferenceRegion(next._1).overlaps(getReferenceRegion(next._2)) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala index 36a54fd37a..0c8d5aa70b 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala @@ -19,8 +19,7 @@ package org.bdgenomics.adam.rdd import org.apache.spark.HashPartitioner import org.apache.spark.SparkContext._ -import org.bdgenomics.adam.models.{ ReferenceMapping, ReferenceRegion, SequenceDictionary, SequenceRecord } -import org.bdgenomics.adam.rich.ReferenceMappingContext._ +import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary, SequenceRecord } import org.bdgenomics.adam.util.ADAMFunSuite import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } @@ -53,8 +52,8 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { val record2 = AlignmentRecord.newBuilder(built).setStart(3L).setEnd(4L).build() val baseRecord = AlignmentRecord.newBuilder(built).setCigar("4M").setEnd(5L).build() - val baseRdd = sc.parallelize(Seq(baseRecord)) - val recordsRdd = sc.parallelize(Seq(record1, record2)) + val baseRdd = sc.parallelize(Seq(baseRecord)).keyBy(ReferenceRegion(_).get) + val recordsRdd = sc.parallelize(Seq(record1, record2)).keyBy(ReferenceRegion(_).get) assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( sc, @@ -108,8 +107,8 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { val baseRecord1 = AlignmentRecord.newBuilder(builtRef1).setCigar("4M").setEnd(5L).build() val baseRecord2 = AlignmentRecord.newBuilder(builtRef2).setCigar("4M").setEnd(5L).build() - val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)) - val recordsRdd = sc.parallelize(Seq(record1, record2, record3)) + val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)).keyBy(ReferenceRegion(_).get) + val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get) assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( sc, @@ -163,8 +162,8 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { val baseRecord1 = AlignmentRecord.newBuilder(builtRef1).setCigar("4M").setEnd(5L).build() val baseRecord2 = AlignmentRecord.newBuilder(builtRef2).setCigar("4M").setEnd(5L).build() - val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)) - val recordsRdd = sc.parallelize(Seq(record1, record2, record3)) + val baseRdd = sc.parallelize(Seq(baseRecord1, baseRecord2)).keyBy(ReferenceRegion(_).get) + val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoin.cartesianFilter( baseRdd, @@ -188,8 +187,8 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { } object ShuffleRegionJoinSuite { - def getReferenceRegion[T](record: T)(implicit mapping: ReferenceMapping[T]): ReferenceRegion = - mapping.getReferenceRegion(record) + def getReferenceRegion(record: AlignmentRecord): ReferenceRegion = + ReferenceRegion(record).get def merge(prev: Boolean, next: (AlignmentRecord, AlignmentRecord)): Boolean = prev && getReferenceRegion(next._1).overlaps(getReferenceRegion(next._2)) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rich/AlignmentRecordReferenceMappingSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rich/AlignmentRecordReferenceMappingSuite.scala deleted file mode 100644 index bb4fb3be85..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rich/AlignmentRecordReferenceMappingSuite.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rich - -import org.bdgenomics.adam.rich.ReferenceMappingContext.AlignmentRecordReferenceMapping -import org.bdgenomics.adam.util.ADAMFunSuite -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } - -class AlignmentRecordReferenceMappingSuite extends ADAMFunSuite { - - sparkTest("test getReferenceId returns the right referenceId") { - val contig = Contig.newBuilder - .setContigName("chr12") - .build - - val rec = AlignmentRecord.newBuilder().setContig(contig).build() - assert(AlignmentRecordReferenceMapping.getReferenceName(rec) === "chr12") - } -}