Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor RDD loading; explicitly load alignments #468

Merged
merged 1 commit into from
Nov 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable {
* @tparam U A predicate to apply on the load.
*/
def adamRecordLoad[U <: ADAMPredicate[AlignmentRecord]](filePath: java.lang.String): JavaAlignmentRecordRDD = {
new JavaAlignmentRecordRDD(ac.adamLoad[AlignmentRecord, U](filePath).toJavaRDD())
new JavaAlignmentRecordRDD(ac.loadAlignments[U](filePath).toJavaRDD())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class JavaADAMContextSuite extends SparkFunSuite {

sparkTest("can read a small .SAM file inside of java") {
val path = ClassLoader.getSystemClassLoader.getResource("small.sam").getFile
val reads: RDD[AlignmentRecord] = sc.adamLoad(path)
val reads: RDD[AlignmentRecord] = sc.loadAlignments(path)

val newReads: JavaAlignmentRecordRDD = JavaADAMConduit.conduit(reads)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Adam2Fastq(val args: Adam2FastqArgs) extends ADAMSparkCommand[Adam2FastqAr
else
None

var reads: RDD[AlignmentRecord] = sc.adamLoad(args.inputPath, projection = projectionOpt)
var reads: RDD[AlignmentRecord] = sc.loadAlignments(args.inputPath, projection = projectionOpt)

if (args.repartition != -1) {
log.info("Repartitioning reads to to '%d' partitions".format(args.repartition))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class CalculateDepth(protected val args: CalculateDepthArgs) extends ADAMSparkCo

val proj = Projection(contig, start, cigar, readMapped)

val adamRDD: RDD[AlignmentRecord] = sc.adamLoad(args.adamInputPath, projection = Some(proj))
val adamRDD: RDD[AlignmentRecord] = sc.loadAlignments(args.adamInputPath, projection = Some(proj))
val mappedRDD = adamRDD.filter(_.getReadMapped)

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class CountKmers(protected val args: CountKmersArgs) extends ADAMSparkCommand[Co
ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

// read from disk
var adamRecords: RDD[AlignmentRecord] = sc.adamLoad(
var adamRecords: RDD[AlignmentRecord] = sc.loadAlignments(
args.inputPath,
projection = Some(Projection(AlignmentRecordField.sequence)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class FlagStat(protected val args: FlagStatArgs) extends ADAMSparkCommand[FlagSt
AlignmentRecordField.mapq,
AlignmentRecordField.failedVendorQualityChecks)

val adamFile: RDD[AlignmentRecord] = sc.adamLoad(args.inputPath, projection = Some(projection))
val adamFile: RDD[AlignmentRecord] = sc.loadAlignments(args.inputPath, projection = Some(projection))

val (failedVendorQuality, passedVendorQuality) = adamFile.adamFlagStat()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MpileupCommand(protected val args: MpileupArgs) extends ADAMSparkCommand[M

def run(sc: SparkContext, job: Job) {

val reads: RDD[AlignmentRecord] = sc.adamLoad(args.readInput, Some(classOf[UniqueMappedReadPredicate]))
val reads: RDD[AlignmentRecord] = sc.loadAlignments(args.readInput, Some(classOf[UniqueMappedReadPredicate]))

val pileups = new PileupTraversable(reads)
for (pileup <- pileups) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class PluginExecutor(protected val args: PluginExecutorArgs) extends ADAMSparkCo
}
}

val firstRdd: RDD[AlignmentRecord] = load[AlignmentRecord](sc, args.input, plugin.projection)
val firstRdd: RDD[AlignmentRecord] = sc.loadAlignments(args.input, projection = plugin.projection)

val input = filter match {
case None => firstRdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class PrintTags(protected val args: PrintTagsArgs) extends ADAMSparkCommand[Prin
val toCount = if (args.count != null) args.count.split(",").toSet else Set()

val proj = Projection(attributes, primaryAlignment, readMapped, readPaired, failedVendorQualityChecks)
val rdd: RDD[AlignmentRecord] = sc.adamLoad(args.inputPath, projection = Some(proj))
val rdd: RDD[AlignmentRecord] = sc.loadAlignments(args.inputPath, projection = Some(proj))
val filtered = rdd.filter(rec => !rec.getFailedVendorQualityChecks)

if (args.list != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Reads2Ref(protected val args: Reads2RefArgs) extends ADAMSparkCommand[Read
val companion = Reads2Ref

def run(sc: SparkContext, job: Job) {
val reads: RDD[AlignmentRecord] = sc.adamLoad(args.readInput, Some(classOf[UniqueMappedReadPredicate]))
val reads: RDD[AlignmentRecord] = sc.loadAlignments(args.readInput, Some(classOf[UniqueMappedReadPredicate]))

val pileups: RDD[Pileup] = reads.adamRecords2Pileup(args.nonPrimary)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,6 @@ class Transform(protected val args: TransformArgs) extends ADAMSparkCommand[Tran
}

def run(sc: SparkContext, job: Job) {
this.apply(sc.adamLoad(args.inputPath)).adamSave(args)
this.apply(sc.loadAlignments(args.inputPath)).adamSave(args)
}
}
2 changes: 1 addition & 1 deletion adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class View(val args: ViewArgs) extends ADAMSparkCommand[ViewArgs] {

def run(sc: SparkContext, job: Job) = {

val reads: RDD[AlignmentRecord] = applyFilters(sc.adamLoad(args.inputPath))
val reads: RDD[AlignmentRecord] = applyFilters(sc.loadAlignments(args.inputPath))

if (args.outputPath != null)
reads.adamAlignedRecordSave(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class VizReads(protected val args: VizReadsArgs) extends ADAMSparkCommand[VizRea
VizReads.refName = args.refName

val proj = Projection(contig, readMapped, readName, start, end)
VizReads.reads = sc.adamLoad(args.inputPath, projection = Some(proj))
VizReads.reads = sc.loadAlignments(args.inputPath, projection = Some(proj))

val server = new org.eclipse.jetty.server.Server(8080)
val handlers = new org.eclipse.jetty.server.handler.ContextHandlerCollection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ViewSuite extends SparkFunSuite {
)
)

reads = transform.apply(sc.adamLoad(inputSamPath)).collect()
reads = transform.apply(sc.loadAlignments(inputSamPath)).collect()
readsCount = reads.size.toInt
}

Expand Down
69 changes: 43 additions & 26 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
RecordGroupDictionary.fromSAMHeader(samHeader)
}

private[rdd] def adamParquetLoad[T <% SpecificRecord: Manifest, U <: UnboundRecordFilter](
/**
* This method will create a new RDD.
* @param filePath The path to the input data
* @param predicate An optional pushdown predicate to use when reading the data
* @param projection An option projection schema to use when reading the data
* @tparam T The type of records to return
* @return An RDD with records of the specified type
*/
def adamLoad[T <% SpecificRecord: Manifest, U <: UnboundRecordFilter](
filePath: String,
predicate: Option[Class[U]] = None,
projection: Option[Schema] = None): RDD[T] = {
Expand Down Expand Up @@ -173,7 +181,7 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
throw new IllegalArgumentException("If you're reading a BAM/SAM file, the record type must be Read")

} else {
val projected: RDD[T] = adamParquetLoad[T, UnboundRecordFilter](filePath, None, projection = Some(projection))
val projected: RDD[T] = adamLoad[T, UnboundRecordFilter](filePath, None, projection = Some(projection))

val recs: RDD[SequenceRecord] =
if (isADAMRecord) {
Expand Down Expand Up @@ -214,52 +222,61 @@ class ADAMContext(val sc: SparkContext) extends Serializable with Logging {
records.map(p => samRecordConverter.convert(p._2.get, seqDict, readGroups))
}

/**
* This method will create a new RDD.
* @param filePath The path to the input data
* @param predicate An optional pushdown predicate to use when reading the data
* @param projection An option projection schema to use when reading the data
* @tparam T The type of records to return
* @return An RDD with records of the specified type
*/
def adamLoad[T <% SpecificRecord: Manifest, U <: ADAMPredicate[T]](
def maybeLoadBam[U <: ADAMPredicate[AlignmentRecord]](
filePath: String,
predicate: Option[Class[U]] = None,
projection: Option[Schema] = None): RDD[T] = {
projection: Option[Schema] = None): Option[RDD[AlignmentRecord]] = {

if (filePath.endsWith(".bam") ||
filePath.endsWith(".sam") && classOf[AlignmentRecord].isAssignableFrom(manifest[T].runtimeClass)) {
if (filePath.endsWith(".bam") || filePath.endsWith(".sam")) {

if (projection.isDefined) {
log.warn("Projection is ignored when loading a BAM file")
}

val reads = adamBamLoad(filePath).asInstanceOf[RDD[T]]
val reads = adamBamLoad(filePath)

applyPredicate(reads, predicate)
Some(applyPredicate(reads, predicate))
} else
None
}

} else if (filePath.endsWith(".ifq") && classOf[AlignmentRecord].isAssignableFrom(manifest[T].runtimeClass)) {
def maybeLoadFastq[U <: ADAMPredicate[AlignmentRecord]](
filePath: String,
predicate: Option[Class[U]] = None,
projection: Option[Schema] = None): Option[RDD[AlignmentRecord]] = {

if (filePath.endsWith(".ifq")) {

if (projection.isDefined) {
log.warn("Projection is ignored when loading an interleaved FASTQ file")
}
val reads = AlignmentRecordContext.adamInterleavedFastqLoad(sc, filePath).asInstanceOf[RDD[T]]
val reads = AlignmentRecordContext.adamInterleavedFastqLoad(sc, filePath)

applyPredicate(reads, predicate)
Some(applyPredicate(reads, predicate))

} else if ((filePath.endsWith(".fastq") || filePath.endsWith(".fq")) &&
classOf[AlignmentRecord].isAssignableFrom(manifest[T].runtimeClass)) {
} else if ((filePath.endsWith(".fastq") || filePath.endsWith(".fq"))) {

if (projection.isDefined) {
log.warn("Projection is ignored when loading a FASTQ file")
}
val reads = AlignmentRecordContext.adamUnpairedFastqLoad(sc, filePath).asInstanceOf[RDD[T]]
val reads = AlignmentRecordContext.adamUnpairedFastqLoad(sc, filePath)

applyPredicate(reads, predicate)
Some(applyPredicate(reads, predicate))
} else
None
}

} else {
adamParquetLoad(filePath, predicate, projection)
}
def loadAlignments[U <: ADAMPredicate[AlignmentRecord]](
filePath: String,
predicate: Option[Class[U]] = None,
projection: Option[Schema] = None): RDD[AlignmentRecord] = {

maybeLoadBam(filePath, predicate, projection)
.orElse(
maybeLoadFastq(filePath, predicate, projection)
).getOrElse(
adamLoad[AlignmentRecord, U](filePath, predicate, projection)
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class NucleotideContigFragmentContext(val sc: SparkContext) extends Serializable
log.info("Converting FASTA to ADAM.")
FastaConverter(remapData, fragmentLength)
} else {
sc.adamParquetLoad(filePath)
sc.adamLoad(filePath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class AlignmentRecordContext(val sc: SparkContext) extends Serializable with Log
def loadADAMFromPaths(paths: Seq[Path]): RDD[AlignmentRecord] = {
def loadADAMs(path: Path): (SequenceDictionary, RDD[AlignmentRecord]) = {
val dict = sc.adamDictionaryLoad[AlignmentRecord](path.toString)
val rdd: RDD[AlignmentRecord] = sc.adamLoad(path.toString)
val rdd: RDD[AlignmentRecord] = sc.loadAlignments(path.toString)
(dict, rdd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ConsensusGeneratorFromReadsSuite extends SparkFunSuite {

def artificial_reads: RDD[AlignmentRecord] = {
val path = ClassLoader.getSystemClassLoader.getResource("artificial.sam").getFile
sc.adamLoad[AlignmentRecord, UniqueMappedReadPredicate](path)
sc.loadAlignments[UniqueMappedReadPredicate](path)
}

sparkTest("checking search for consensus list for artificial reads") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FieldEnumerationSuite extends SparkFunSuite with BeforeAndAfter {
cleanParquet(readsParquetFile)

// Convert the reads12.sam file into a parquet file
val bamReads: RDD[AlignmentRecord] = sc.adamLoad(readsFilepath)
val bamReads: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath)
bamReads.adamParquetSave(readsParquetFile.getAbsolutePath)
}

Expand Down Expand Up @@ -75,7 +75,7 @@ class FieldEnumerationSuite extends SparkFunSuite with BeforeAndAfter {

val p1 = Projection(AlignmentRecordField.readName)

val reads1: RDD[AlignmentRecord] = sc.adamLoad(readsParquetFile.getAbsolutePath, projection = Some(p1))
val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFile.getAbsolutePath, projection = Some(p1))

assert(reads1.count() === 200)

Expand All @@ -85,7 +85,7 @@ class FieldEnumerationSuite extends SparkFunSuite with BeforeAndAfter {

val p2 = Projection(AlignmentRecordField.readName, AlignmentRecordField.readMapped)

val reads2: RDD[AlignmentRecord] = sc.adamLoad(readsParquetFile.getAbsolutePath, projection = Some(p2))
val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFile.getAbsolutePath, projection = Some(p2))

assert(reads2.count() === 200)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ class ADAMContextSuite extends SparkFunSuite {
val readsFilepath = ClassLoader.getSystemClassLoader.getResource("unmapped.sam").getFile

// Convert the reads12.sam file into a parquet file
val bamReads: RDD[AlignmentRecord] = sc.adamLoad(readsFilepath)
val bamReads: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath)
assert(bamReads.count === 200)
}

sparkTest("can read a small .SAM file") {
val path = ClassLoader.getSystemClassLoader.getResource("small.sam").getFile
val reads: RDD[AlignmentRecord] = sc.adamLoad(path)
val reads: RDD[AlignmentRecord] = sc.loadAlignments(path)
assert(reads.count() === 20)
}

sparkTest("can filter a .SAM file based on quality") {
val path = ClassLoader.getSystemClassLoader.getResource("small.sam").getFile
val reads: RDD[AlignmentRecord] = sc.adamLoad(path, predicate = Some(classOf[HighQualityReadPredicate]))
val reads: RDD[AlignmentRecord] = sc.loadAlignments(path, predicate = Some(classOf[HighQualityReadPredicate]))
assert(reads.count() === 18)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class GenomicRegionPartitionerSuite extends SparkFunSuite {
import org.bdgenomics.adam.projections.AlignmentRecordField._
Projection(contig, start, readName, readMapped)
}
val rdd: RDD[AlignmentRecord] = sc.adamLoad(filename, projection = Some(p))
val rdd: RDD[AlignmentRecord] = sc.loadAlignments(filename, projection = Some(p))

assert(rdd.count() === 200)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class ADAMAlignmentRecordRDDFunctionsSuite extends SparkFunSuite {

sparkTest("characterizeTags counts tags in a SAM file correctly") {
val filePath = getClass.getClassLoader.getResource("reads12.sam").getFile
val sam: RDD[AlignmentRecord] = sc.adamLoad(filePath)
val sam: RDD[AlignmentRecord] = sc.loadAlignments(filePath)

val mapCounts: Map[String, Long] = Map(sam.adamCharacterizeTags().collect(): _*)
assert(mapCounts("NM") === 200)
Expand All @@ -250,7 +250,7 @@ class ADAMAlignmentRecordRDDFunctionsSuite extends SparkFunSuite {

sparkTest("round trip from ADAM to SAM and back to ADAM produces equivalent Read values") {
val reads12Path = Thread.currentThread().getContextClassLoader.getResource("reads12.sam").getFile
val rdd12A: RDD[AlignmentRecord] = sc.adamLoad(reads12Path)
val rdd12A: RDD[AlignmentRecord] = sc.loadAlignments(reads12Path)

val tempFile = Files.createTempDirectory("reads12")
rdd12A.adamSAMSave(tempFile.toAbsolutePath.toString + "/reads12.sam", asSam = true)
Expand All @@ -273,19 +273,19 @@ class ADAMAlignmentRecordRDDFunctionsSuite extends SparkFunSuite {

sparkTest("SAM conversion sets read mapped flag properly") {
val filePath = getClass.getClassLoader.getResource("reads12.sam").getFile
val sam: RDD[AlignmentRecord] = sc.adamLoad(filePath)
val sam: RDD[AlignmentRecord] = sc.loadAlignments(filePath)

sam.collect().foreach(r => assert(r.getReadMapped))
}

sparkTest("round trip from ADAM to FASTQ and back to ADAM produces equivalent Read values") {
val reads12Path = Thread.currentThread().getContextClassLoader.getResource("interleaved_fastq_sample1.fq").getFile
val rdd12A: RDD[AlignmentRecord] = sc.adamLoad(reads12Path)
val rdd12A: RDD[AlignmentRecord] = sc.loadAlignments(reads12Path)

val tempFile = Files.createTempDirectory("reads12")
rdd12A.adamSaveAsFastq(tempFile.toAbsolutePath.toString + "/reads12.fq")

val rdd12B: RDD[AlignmentRecord] = sc.adamLoad(tempFile.toAbsolutePath.toString + "/reads12.fq")
val rdd12B: RDD[AlignmentRecord] = sc.loadAlignments(tempFile.toAbsolutePath.toString + "/reads12.fq")

assert(rdd12B.count() === rdd12A.count())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class TrimReadsSuite extends SparkFunSuite {

sparkTest("adaptively trim reads") {
val readsFilepath = ClassLoader.getSystemClassLoader.getResource("bqsr1.sam").getFile
val reads: RDD[AlignmentRecord] = sc.adamLoad(readsFilepath)
val reads: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath)

// put all reads into a single read group
val readsSingleRG = reads.map(read => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class IndelRealignmentTargetSuite extends SparkFunSuite {
// Note: this can't be lazy vals because Spark won't find the RDDs after the first test
def mason_reads: RDD[RichAlignmentRecord] = {
val path = ClassLoader.getSystemClassLoader.getResource("small_realignment_targets.sam").getFile
sc.adamLoad[AlignmentRecord, UniqueMappedReadPredicate](path).map(RichAlignmentRecord(_))
sc.loadAlignments[UniqueMappedReadPredicate](path).map(RichAlignmentRecord(_))
}

def artificial_reads: RDD[RichAlignmentRecord] = {
val path = ClassLoader.getSystemClassLoader.getResource("artificial.sam").getFile
sc.adamLoad[AlignmentRecord, UniqueMappedReadPredicate](path).map(RichAlignmentRecord(_))
sc.loadAlignments[UniqueMappedReadPredicate](path).map(RichAlignmentRecord(_))
}

def make_read(start: Long, cigar: String, mdtag: String, length: Int, refLength: Int, id: Int = 0): RichAlignmentRecord = {
Expand Down
Loading