Skip to content

Commit

Permalink
Merge 9e0e5d5 into 00ecde1
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft authored Dec 23, 2017
2 parents 00ecde1 + 9e0e5d5 commit 084f071
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 15 deletions.
4 changes: 2 additions & 2 deletions adam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>genomic-loci_${scala.version.prefix}</artifactId>
<groupId>org.hammerlab.genomics</groupId>
<artifactId>loci_${scala.version.prefix}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ object ReferenceRegion {
case LociRanges(ranges) => {
ranges.map(range => range match {
case LociRange(contigName, start, None) => {
ReferencePosition(contigName, start)
ReferencePosition(contigName.toString, start)
}
case LociRange(contigName, start, Some(end)) => {
ReferenceRegion(contigName, start, end)
ReferenceRegion(contigName.toString, start, end)
}
})
}
Expand Down
40 changes: 34 additions & 6 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }
import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.{
SAMFileHeader,
SAMProgramRecord,
SAMRecord,
ValidationStringency
}
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
Expand Down Expand Up @@ -104,6 +109,8 @@ import org.bdgenomics.formats.avro.{
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
import spark_bam._
import org.hammerlab.paths.{ Path => HLPath }
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.seqdoop.hadoop_bam._
Expand Down Expand Up @@ -142,6 +149,8 @@ private case class LocatableReferenceRegion(rr: ReferenceRegion) extends Locatab
*/
object ADAMContext {

val USE_SPARK_BAM = "org.bdgenomics.adam.rdd.ADAMContext.USE_SPARK_BAM"

// conversion functions for pipes
implicit def sameTypeConversionFn[T, U <: GenomicRDD[T, U]](gRdd: U,
rdd: RDD[T]): U = {
Expand Down Expand Up @@ -1507,19 +1516,38 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
// contains bams, hadoop-bam is a-ok! i believe that it is better (perf) to
// just load from a single newAPIHadoopFile call instead of a union across
// files, so we do that whenever possible
def hadoopBamRead(path: String): RDD[SAMRecord] = {
sc.newAPIHadoopFile(path,
classOf[AnySAMInputFormat],
classOf[LongWritable],
classOf[SAMRecordWritable],
ContextUtil.getConfiguration(job)).map(_._2.get)
}
val readerFn = if (sc.hadoopConfiguration.getBoolean(ADAMContext.USE_SPARK_BAM, false)) {
def sparkBamRead(path: String): RDD[SAMRecord] = {
if (path.endsWith(".bam")) {
System.err.println("Using Spark-BAM")
sc.loadReads(HLPath(path))
} else {
hadoopBamRead(path)
}
}

sparkBamRead(_)
} else {
hadoopBamRead(_)
}
val records = if (filteredFiles.length != bamFiles.length) {
sc.union(filteredFiles.map(p => {
sc.newAPIHadoopFile(p.toString, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
readerFn(p.toString)
}))
} else {
sc.newAPIHadoopFile(pathName, classOf[AnySAMInputFormat], classOf[LongWritable],
classOf[SAMRecordWritable], ContextUtil.getConfiguration(job))
readerFn(pathName)
}
if (Metrics.isRecording) records.instrument() else records
val samRecordConverter = new SAMRecordConverter

AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p)),
seqDict,
readGroups,
programs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,13 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[org.apache.spark.sql.types.StructField])
kryo.register(classOf[org.apache.spark.sql.types.StructType])

// org.hammerlab
kryo.register(classOf[org.hammerlab.bam.header.ContigLengths])
kryo.register(classOf[org.hammerlab.bam.header.Header])
kryo.register(classOf[org.hammerlab.bgzf.Pos])
kryo.register(classOf[org.hammerlab.genomics.reference.ContigName])
kryo.register(Class.forName("org.hammerlab.genomics.reference.package$Locus"))

// scala
kryo.register(classOf[scala.Array[scala.Array[Byte]]])
kryo.register(classOf[scala.Array[htsjdk.variant.vcf.VCFHeader]])
Expand Down Expand Up @@ -343,6 +350,7 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
kryo.register(classOf[scala.Array[String]])
kryo.register(classOf[scala.Array[Option[_]]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))
kryo.register(Class.forName("scala.math.Ordering$Int$"))

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
Expand All @@ -354,9 +362,12 @@ class ADAMKryoRegistrator extends KryoRegistrator with Logging {
// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.RedBlackTree$BlackTree"))
kryo.register(Class.forName("scala.collection.immutable.RedBlackTree$RedTree"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))
kryo.register(Class.forName("scala.collection.immutable.Set$EmptySet$"))
kryo.register(classOf[scala.collection.immutable.TreeMap[_, _]])

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(reads.dataset.rdd.count === 20)
}

sparkTest("can read a small .SAM file using spark-bam") {
val path = testFile("small.sam")
sc.hadoopConfiguration.setBoolean(ADAMContext.USE_SPARK_BAM, true)
val reads = sc.loadAlignments(path)
val loc = tmpLocation(".bam")
reads.saveAsSam(loc, asSingleFile = true)
assert(sc.loadAlignments(loc).rdd.count() === 20)
assert(sc.loadAlignments(loc).dataset.count() === 20)
}

sparkTest("loading a sam file with a bad header and strict stringency should fail") {
val path = testFile("badheader.sam")
intercept[SAMFormatException] {
Expand Down Expand Up @@ -118,6 +128,17 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(reads.count() === 18)
}

sparkTest("can filter a .SAM file based on quality with spark-bam") {
val path = testFile("small.sam")
val loc = tmpLocation(".bam")

sc.hadoopConfiguration.setBoolean(ADAMContext.USE_SPARK_BAM, true)
sc.loadAlignments(path)
.transform(_.filter(a => (a.getReadMapped && a.getMapq > 30)))
.saveAsSam(loc, asSingleFile = true)
assert(sc.loadAlignments(loc).rdd.count() === 18)
}

test("Can convert to phred") {
assert(successProbabilityToPhred(0.9) === 10)
assert(successProbabilityToPhred(0.99999) === 50)
Expand Down
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>genomic-loci_${scala.version.prefix}</artifactId>
<version>1.4.4</version>
<groupId>org.hammerlab.genomics</groupId>
<artifactId>loci_${scala.version.prefix}</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.bdgenomics.bdg-formats</groupId>
<artifactId>*</artifactId>
<groupId>org.hammerlab</groupId>
<artifactId>iterator_2.10</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -553,6 +553,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hammerlab</groupId>
<artifactId>spark-bam_2.10</artifactId>
<version>${spark-bam.version}</version>
</dependency>
<dependency>
<groupId>com.github.samtools</groupId>
<artifactId>htsjdk</artifactId>
Expand Down

0 comments on commit 084f071

Please sign in to comment.