Skip to content

Commit

Permalink
Merge pull request #884 from fnothaft/caching
Browse files Browse the repository at this point in the history
[ADAM-883] Add caching to Transform pipeline.
  • Loading branch information
heuermh committed Nov 19, 2015
2 parents f680829 + 9962d5f commit 5845b15
Showing 1 changed file with 41 additions and 4 deletions.
45 changes: 41 additions & 4 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.spark.rdd.RDD
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.SnpTable
Expand Down Expand Up @@ -105,6 +106,10 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var mdTagsFragmentSize: Long = 1000000L
@Args4jOption(required = false, name = "-md_tag_overwrite", usage = "When adding MD tags to reads, overwrite existing incorrect tags.")
var mdTagsOverwrite: Boolean = false
@Args4jOption(required = false, name = "-cache", usage = "Cache data to avoid recomputing between stages.")
var cache: Boolean = false
@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
var storageLevel: String = "MEMORY_ONLY"
}

class Transform(protected val args: TransformArgs) extends BDGSparkCommand[TransformArgs] with Logging {
Expand All @@ -116,6 +121,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

var adamRecords = rdd
val sc = rdd.context
val sl = StorageLevel.fromString(args.storageLevel)

val stringencyOpt = Option(args.stringency).map(ValidationStringency.valueOf(_))

Expand All @@ -130,29 +136,50 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
}

if (args.locallyRealign) {
val oldRdd = if (args.cache) {
adamRecords.persist(sl)
} else {
adamRecords
}

log.info("Locally realigning indels.")
val consensusGenerator = Option(args.knownIndelsFile)
.fold(new ConsensusGeneratorFromReads().asInstanceOf[ConsensusGenerator])(
new ConsensusGeneratorFromKnowns(_, sc).asInstanceOf[ConsensusGenerator])

adamRecords = adamRecords.adamRealignIndels(
adamRecords = oldRdd.adamRealignIndels(
consensusGenerator,
isSorted = false,
args.maxIndelSize,
args.maxConsensusNumber,
args.lodThreshold,
args.maxTargetSize
)

if (args.cache) {
oldRdd.unpersist()
}
}

if (args.recalibrateBaseQualities) {
log.info("Recalibrating base qualities")

val oldRdd = if (args.cache) {
adamRecords.persist(sl)
} else {
adamRecords
}

val knownSnps: SnpTable = createKnownSnpsTable(sc)
adamRecords = adamRecords.adamBQSR(
adamRecords = oldRdd.adamBQSR(
sc.broadcast(knownSnps),
Option(args.observationsPath),
stringency
)

if (args.cache) {
oldRdd.unpersist()
}
}

if (args.coalesce != -1) {
Expand All @@ -166,8 +193,18 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

// NOTE: For now, sorting needs to be the last transform
if (args.sortReads) {
val oldRdd = if (args.cache) {
adamRecords.persist(sl)
} else {
adamRecords
}

log.info("Sorting reads")
adamRecords = adamRecords.adamSortReadsByReferencePosition()
adamRecords = oldRdd.adamSortReadsByReferencePosition()

if (args.cache) {
oldRdd.unpersist()
}
}

if (args.mdTagsReferenceFile != null) {
Expand Down

0 comments on commit 5845b15

Please sign in to comment.