Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-1683] Pull in Spark-BAM as a secondary loading path. #1686

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions adam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>genomic-loci_${scala.version.prefix}</artifactId>
<groupId>org.hammerlab.genomics</groupId>
<artifactId>loci_${scala.version.prefix}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ object ReferenceRegion {
case LociRanges(ranges) => {
ranges.map(range => range match {
case LociRange(contigName, start, None) => {
ReferencePosition(contigName, start)
ReferencePosition(contigName.toString, start)
}
case LociRange(contigName, start, Some(end)) => {
ReferenceRegion(contigName, start, end)
ReferenceRegion(contigName.toString, start, end)
}
})
}
Expand Down
40 changes: 34 additions & 6 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }
import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.{
SAMFileHeader,
SAMProgramRecord,
SAMRecord,
ValidationStringency
}
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
Expand Down Expand Up @@ -104,6 +109,8 @@ import org.bdgenomics.formats.avro.{
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
import spark_bam._
import org.hammerlab.paths.{ Path => HLPath }
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.seqdoop.hadoop_bam._
Expand Down Expand Up @@ -142,6 +149,8 @@ private case class LocatableReferenceRegion(rr: ReferenceRegion) extends Locatab
*/
object ADAMContext {

val USE_SPARK_BAM = "org.bdgenomics.adam.rdd.ADAMContext.USE_SPARK_BAM"

// conversion functions for pipes
implicit def sameTypeConversionFn[T, U <: GenomicRDD[T, U]](gRdd: U,
rdd: RDD[T]): U = {
Expand Down Expand Up @@ -1507,19 +1516,38 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
// contains bams, hadoop-bam is a-ok! i believe that it is better (perf) to
// just load from a single newAPIHadoopFile call instead of a union across
// files, so we do that whenever possible
def hadoopBamRead(path: String): RDD[SAMRecord] = {
sc.newAPIHadoopFile(path,
classOf[AnySAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
ContextUtil.getConfiguration(job)).map(_._2.get)
}
val readerFn = if (sc.hadoopConfiguration.getBoolean(ADAMContext.USE_SPARK_BAM, false)) {
def sparkBamRead(path: String): RDD[SAMRecord] = {
if (path.endsWith(".bam")) {
System.err.println("Using Spark-BAM")
sc.loadReads(HLPath(path))
} else {
hadoopBamRead(path)
}
}

sparkBamRead(_)
} else {
hadoopBamRead(_)
}
val records = if (filteredFiles.length != bamFiles.length) {
sc.union(filteredFiles.map(p => {
sc.newAPIHadoopFile(p.toString, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
readerFn(p.toString)
}))
} else {
sc.newAPIHadoopFile(pathName, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
readerFn(pathName)
}
if (Metrics.isRecording) records.instrument() else records
val samRecordConverter = new SAMRecordConverter

AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p)),
seqDict,
readGroups,
programs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,13 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.apache.spark.sql.types.StructField])
kryo.register(classOf[org.apache.spark.sql.types.StructType])

// org.hammerlab
kryo.register(classOf[org.hammerlab.bam.header.ContigLengths])
kryo.register(classOf[org.hammerlab.bam.header.Header])
kryo.register(classOf[org.hammerlab.bgzf.Pos])
kryo.register(classOf[org.hammerlab.genomics.reference.ContigName])
kryo.register(Class.forName("org.hammerlab.genomics.reference.package$Locus"))

// scala
kryo.register(classOf[scala.Array[scala.Array[Byte]]])
kryo.register(classOf[scala.Array[htsjdk.variant.vcf.VCFHeader]])
Expand Down Expand Up @@ -343,6 +350,7 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[scala.Array[String]])
kryo.register(classOf[scala.Array[Option[_]]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))
kryo.register(Class.forName("scala.math.Ordering$Int$"))

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
Expand All @@ -354,9 +362,12 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.RedBlackTree$BlackTree"))
kryo.register(Class.forName("scala.collection.immutable.RedBlackTree$RedTree"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))
kryo.register(Class.forName("scala.collection.immutable.Set$EmptySet$"))
kryo.register(classOf[scala.collection.immutable.TreeMap[_, _]])

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(reads.dataset.rdd.count === 20)
}

sparkTest("can read a small .SAM file using spark-bam") {
val path = testFile("small.sam")
sc.hadoopConfiguration.setBoolean(ADAMContext.USE_SPARK_BAM, true)
val reads = sc.loadAlignments(path)
val loc = tmpLocation(".bam")
reads.saveAsSam(loc, asSingleFile = true)
assert(sc.loadAlignments(loc).rdd.count() === 20)
assert(sc.loadAlignments(loc).dataset.count() === 20)
}

sparkTest("loading a sam file with a bad header and strict stringency should fail") {
val path = testFile("badheader.sam")
intercept[SAMFormatException] {
Expand Down Expand Up @@ -118,6 +128,17 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(reads.count() === 18)
}

sparkTest("can filter a .SAM file based on quality with spark-bam") {
val path = testFile("small.sam")
val loc = tmpLocation(".bam")

sc.hadoopConfiguration.setBoolean(ADAMContext.USE_SPARK_BAM, true)
sc.loadAlignments(path)
.transform(_.filter(a => (a.getReadMapped && a.getMapq > 30)))
.saveAsSam(loc, asSingleFile = true)
assert(sc.loadAlignments(loc).rdd.count() === 18)
}

test("Can convert to phred") {
assert(successProbabilityToPhred(0.9) === 10)
assert(successProbabilityToPhred(0.99999) === 50)
Expand Down
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>genomic-loci_${scala.version.prefix}</artifactId>
<version>1.4.4</version>
<groupId>org.hammerlab.genomics</groupId>
<artifactId>loci_${scala.version.prefix}</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.bdgenomics.bdg-formats</groupId>
<artifactId>*</artifactId>
<groupId>org.hammerlab</groupId>
<artifactId>iterator_2.10</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -553,6 +553,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>spark-bam_2.10</artifactId>
<version>${spark-bam.version}</version>
</dependency>
<dependency>
<groupId>com.github.samtools</groupId>
<artifactId>htsjdk</artifactId>
Expand Down