From 7a2d4dccdd8bed25e56af68388ba2b64a199e99d Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 4 Dec 2013 17:15:46 -0800 Subject: [PATCH 1/5] disambiguate isMismatchBase(), other small cleanups --- .../rdd/recalibration/ReadCovariates.scala | 12 +++--- .../cs/amplab/adam/rich/RichADAMRecord.scala | 40 +++++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala index 712b03341f..2886f061c5 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala @@ -38,15 +38,15 @@ class ReadCovariates(val read: ADAMRecord, qualByRG: QualByRG, covars: List[Stan override def hasNext: Boolean = iter_position < endOffset override def next(): BaseCovariates = { - val idx = (iter_position - startOffset).toInt - val position = read.getPosition(idx) + val offset = (iter_position - startOffset).toInt + val position = read.offsetToPosition(offset) val isMasked = dbsnp == null || position.isEmpty || dbsnp.value(read.getReferenceName.toString).contains(position.get.toInt) || - read.isMismatchBase(idx).isEmpty - val isMisMatch = read.isMismatchBase(idx).getOrElse(false) // getOrElse because reads without an MD tag can appear during *application* of recal table + read.isMismatchAtOffset(offset).isEmpty + val isMisMatch = read.isMismatchAtOffset(offset).getOrElse(false) // getOrElse because reads without an MD tag can appear during *application* of recal table iter_position += 1 - new BaseCovariates(qualCovar(idx), requestedCovars.map(v => v(idx)).toArray, - read.qualityScores(idx), isMisMatch, isMasked) + new BaseCovariates(qualCovar(offset), requestedCovars.map(v => v(offset)).toArray, + read.qualityScores(offset), isMisMatch, isMasked) } } diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala index ebfd489125..0d31b99dbc 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala @@ -63,7 +63,7 @@ class RichADAMRecord(record: ADAMRecord) { Some(samtoolsCigar.getCigarElements .filter(p => p.getOperator.consumesReferenceBases()) .foldLeft(record.getStart) { - (pos, cigarEl) => pos + cigarEl.getLength + (pos, cigarEl) => pos + cigarEl.getLength }) } else { None @@ -101,37 +101,39 @@ class RichADAMRecord(record: ADAMRecord) { } } - lazy val mdEvent: Option[MdTag] = if (record.getMismatchingPositions != null) { - Some(MdTag(record.getMismatchingPositions.toString, record.getStart)) - } else { - None + lazy val mdEvent: Option[MdTag] = { + if (record.getMismatchingPositions != null) { + Some(MdTag(record.getMismatchingPositions.toString, record.getStart)) + } else { + None + } } + // does this read overlap with the given position? def overlapsPosition(pos: Long): Option[Boolean] = { if (record.getReadMapped) { - Some(record.getStart <= pos && end.get > pos) + Some(record.getStart <= pos && pos < end.get) } else { None } } - def isMismatchBase(pos: Long): Option[Boolean] = { - if (mdEvent.isEmpty || !overlapsPosition(pos).get) + // does this read mismatch the reference at the given *reference* position? + def isMismatchAtPosition(pos: Long): Option[Boolean] = { + if (mdEvent.isEmpty || !overlapsPosition(pos).get) { None - else + } else { Some(!mdEvent.get.isMatch(pos)) + } } - def isMismatchBase(offset: Int): Option[Boolean] = { + // does this read mismatch the reference at the given offset within the read? + def isMismatchAtOffset(offset: Int): Option[Boolean] = { // careful about offsets that are within an insertion! if (referencePositions.isEmpty) { None } else { - val pos = referencePositions(offset) - if (pos.isEmpty) - None - else - isMismatchBase(pos.get) + offsetToPosition(offset).flatMap(isMismatchAtPosition) } } @@ -169,5 +171,11 @@ class RichADAMRecord(record: ADAMRecord) { } // get the reference position of an offset within the read - def getPosition(offset: Int): Option[Long] = if (record.getReadMapped) referencePositions(offset) else None + def offsetToPosition(offset: Int): Option[Long] = { + if (record.getReadMapped) { + referencePositions(offset) + } else { + None + } + } } From b12fa58485cdde382e8bfa248a92c75f788c52d4 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Fri, 6 Dec 2013 12:06:35 -0800 Subject: [PATCH 2/5] Factor out SnpTable, fix crash in SNP handling --- .../cs/amplab/adam/cli/Transform.scala | 17 ++++- .../cs/amplab/adam/models/SnpTable.scala | 63 +++++++++++++++++++ .../cs/amplab/adam/rdd/AdamRDDFunctions.scala | 16 +---- .../adam/rdd/RecalibrateBaseQualities.scala | 7 ++- .../rdd/recalibration/ReadCovariates.scala | 19 +++--- 5 files changed, 95 insertions(+), 27 deletions(-) create mode 100644 adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala diff --git a/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/Transform.scala b/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/Transform.scala index 562720b56f..7b409548e8 100644 --- a/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/Transform.scala +++ b/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/Transform.scala @@ -20,6 +20,7 @@ import edu.berkeley.cs.amplab.adam.util.ParquetLogger import org.kohsuke.args4j.{Argument, Option => Args4jOption} import edu.berkeley.cs.amplab.adam.avro.ADAMRecord import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ +import edu.berkeley.cs.amplab.adam.models.SnpTable import org.apache.spark.{SparkContext, Logging} import org.apache.spark.rdd.RDD import java.io.File @@ -46,7 +47,7 @@ class TransformArgs extends Args4jBase with ParquetArgs with SparkArgs { @Args4jOption(required = false, name = "-recalibrate_base_qualities", usage = "Recalibrate the base quality scores (ILLUMINA only)") var recalibrateBaseQualities: Boolean = false @Args4jOption(required = false, name = "-dbsnp_sites", usage = "dbsnp sites file") - var dbsnpSitesFile: File = null + var dbsnpSitesFile: String = null @Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory") var coalesce: Int = -1 } @@ -74,7 +75,8 @@ class Transform(protected val args: TransformArgs) extends AdamSparkCommand[Tran if (args.recalibrateBaseQualities) { log.info("Recalibrating base qualities") - adamRecords = adamRecords.adamBQSR(args.dbsnpSitesFile) + val dbSNP = loadSnpTable(sc) + adamRecords = adamRecords.adamBQSR(dbSNP) } // NOTE: For now, sorting needs to be the last transform @@ -87,6 +89,17 @@ class Transform(protected val args: TransformArgs) extends AdamSparkCommand[Tran compressCodec = args.compressionCodec, disableDictionaryEncoding = args.disableDictionary) } + // FIXME: why doesn't this complain if the file doesn't exist? + def loadSnpTable(sc: SparkContext): SnpTable = { + if(args.dbsnpSitesFile != null) { + log.info("Loading SNP table") + //SnpTable(sc.textFile(args.dbsnpSitesFile)) + SnpTable(new File(args.dbsnpSitesFile)) + } else { + SnpTable() + } + } + } diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala new file mode 100644 index 0000000000..c8a7140841 --- /dev/null +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala @@ -0,0 +1,63 @@ +package edu.berkeley.cs.amplab.adam.models + +import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ +import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import scala.collection.immutable._ +import scala.collection.mutable +import java.io.File + +class SnpTable(private val table: Map[String, Set[Long]]) extends Serializable with Logging { + log.info("SNP table has %s contigs and %s entries".format(table.size, table.values.map(_.size).sum)) + + def isMaskedAtOffset(read: ADAMRecord, offset: Int): Boolean = { + val position = read.offsetToPosition(offset) + try { + position.isEmpty || table(read.getReferenceName.toString).contains(position.get) + } catch { + case e: java.util.NoSuchElementException => + false + } + } +} + +object SnpTable { + def apply(): SnpTable = { + new SnpTable(Map[String, Set[Long]]()) + } + + // `dbSNP` is expected to be a sites-only VCF + def apply(dbSNP: File): SnpTable = { + // parse into tuples of (contig, position) + val lines = scala.io.Source.fromFile(dbSNP).getLines() + val tuples = lines.filter(line => !line.startsWith("#")).map(line => { + val split = line.split("\t") + val contig = split(0) + val pos = split(1).toLong + (contig, pos) + }) + // construct map from contig to set of positions + // this is done in-place to reduce overhead + val table = new mutable.HashMap[String, mutable.HashSet[Long]] + tuples.foreach(tup => table.getOrElseUpdate(tup._1, { new mutable.HashSet[Long] }) += tup._2) + // construct SnpTable from immutable copy of `table` + new SnpTable(table.mapValues(_.toSet).toMap) + } + + /* + def apply(lines: RDD[String]): SnpTable = { + // parse into tuples of (contig, position) + val tuples = lines.filter(line => !line.startsWith("#")).map(line => { + val split = line.split("\t") + val contig = split(0) + val pos = split(1).toLong + (contig, pos) + }) + // construct map from contig to set of positions + val table = tuples.groupByKey.collect.toMap.mapValues(_.toSet) + new SnpTable(table) + } + */ +} diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala index 894fdf552e..066f70112b 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala @@ -22,7 +22,7 @@ import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport} import parquet.hadoop.util.ContextUtil import org.apache.avro.specific.SpecificRecord import edu.berkeley.cs.amplab.adam.avro.{ADAMPileup, ADAMRecord, ADAMVariant, ADAMGenotype, ADAMVariantDomain} -import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, SingleReadBucket, ReferencePosition, ADAMRod} +import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, SingleReadBucket, SnpTable, ReferencePosition, ADAMRod} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.Logging @@ -100,18 +100,8 @@ class AdamRecordRDDFunctions(rdd: RDD[ADAMRecord]) extends Serializable with Log MarkDuplicates(rdd) } - def adamBQSR(dbSNP: File): RDD[ADAMRecord] = { - val dbsnpMap = scala.io.Source.fromFile(dbSNP).getLines().map(posLine => { - val split = posLine.split("\t") - val contig = split(0) - val pos = split(1).toInt - (contig, pos) - }).foldLeft(Map[String, Set[Int]]())((dbMap, pair) => { - dbMap + (pair._1 -> (dbMap.getOrElse(pair._1, Set[Int]()) + pair._2)) - }) - - val broadcastDbSNP = rdd.context.broadcast(dbsnpMap) - + def adamBQSR(dbSNP: SnpTable): RDD[ADAMRecord] = { + val broadcastDbSNP = rdd.context.broadcast(dbSNP) RecalibrateBaseQualities(rdd, broadcastDbSNP) } diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala index 1c60316ff0..35ac2032ef 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala @@ -18,6 +18,7 @@ package edu.berkeley.cs.amplab.adam.rdd import org.apache.spark.Logging import org.apache.spark.broadcast.{Broadcast => SparkBroadcast} import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.models.SnpTable import edu.berkeley.cs.amplab.adam.rdd.recalibration._ import org.apache.spark.rdd.RDD @@ -28,7 +29,7 @@ private[rdd] object RecalibrateBaseQualities extends Serializable with Logging { read.getReadMapped && read.getPrimaryAlignment && !read.getDuplicateRead && (read.getMismatchingPositions != null) } - def apply(rdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[Map[String, Set[Int]]]): RDD[ADAMRecord] = { + def apply(rdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RDD[ADAMRecord] = { // initialize the covariates println("Instantiating covariates...") val qualByRG = new QualByRG(rdd) @@ -45,7 +46,7 @@ private[rdd] object RecalibrateBaseQualities extends Serializable with Logging { private[rdd] class RecalibrateBaseQualities(val qualCovar: QualByRG, val covars: List[StandardCovariate]) extends Serializable with Logging { initLogging() - def computeTable(rdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[Map[String, Set[Int]]]): RecalTable = { + def computeTable(rdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RecalTable = { def addCovariates(table: RecalTable, covar: ReadCovariates): RecalTable = { //log.info("Aggregating covarates for read "+covar.read.record.getReadName.toString) @@ -57,7 +58,7 @@ private[rdd] class RecalibrateBaseQualities(val qualCovar: QualByRG, val covars: table1 ++ table2 } - rdd.map(r => ReadCovariates(r, qualCovar, covars, dbsnp)).aggregate(new RecalTable)(addCovariates, mergeTables) + rdd.map(r => ReadCovariates(r, qualCovar, covars, dbsnp.value)).aggregate(new RecalTable)(addCovariates, mergeTables) } def applyTable(table: RecalTable, rdd: RDD[ADAMRecord], qualCovar: QualByRG, covars: List[StandardCovariate]): RDD[ADAMRecord] = { diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala index 2886f061c5..0ecc4f4d0d 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala @@ -17,16 +17,17 @@ package edu.berkeley.cs.amplab.adam.rdd.recalibration import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ import edu.berkeley.cs.amplab.adam.avro.ADAMRecord -import org.apache.spark.broadcast.{Broadcast => SparkBroadcast} +import edu.berkeley.cs.amplab.adam.models.SnpTable object ReadCovariates { - def apply(rec: ADAMRecord, qualRG: QualByRG, covars: List[StandardCovariate], dbsnp: SparkBroadcast[Map[String, Set[Int]]] = null): ReadCovariates = { + def apply(rec: ADAMRecord, qualRG: QualByRG, covars: List[StandardCovariate], + dbsnp: SnpTable = SnpTable()): ReadCovariates = { new ReadCovariates(rec, qualRG, covars, dbsnp) } } class ReadCovariates(val read: ADAMRecord, qualByRG: QualByRG, covars: List[StandardCovariate], - val dbsnp: SparkBroadcast[Map[String, Set[Int]]] = null) extends Iterator[BaseCovariates] with Serializable { + val dbSNP: SnpTable) extends Iterator[BaseCovariates] with Serializable { val startOffset = read.qualityScores.takeWhile(_ <= 2).size val endOffset = read.qualityScores.size - read.qualityScores.reverseIterator.takeWhile(_ <= 2).size @@ -39,14 +40,14 @@ class ReadCovariates(val read: ADAMRecord, qualByRG: QualByRG, covars: List[Stan override def next(): BaseCovariates = { val offset = (iter_position - startOffset).toInt - val position = read.offsetToPosition(offset) - val isMasked = dbsnp == null || position.isEmpty || - dbsnp.value(read.getReferenceName.toString).contains(position.get.toInt) || - read.isMismatchAtOffset(offset).isEmpty - val isMisMatch = read.isMismatchAtOffset(offset).getOrElse(false) // getOrElse because reads without an MD tag can appear during *application* of recal table + val mismatch = read.isMismatchAtOffset(offset) + // FIXME: why does empty mismatch mean it should be masked? + val isMasked = dbSNP.isMaskedAtOffset(read, offset) || mismatch.isEmpty + // getOrElse because reads without an MD tag can appear during *application* of recal table + val isMismatch = mismatch.getOrElse(false) iter_position += 1 new BaseCovariates(qualCovar(offset), requestedCovars.map(v => v(offset)).toArray, - read.qualityScores(offset), isMisMatch, isMasked) + read.qualityScores(offset), isMismatch, isMasked) } } From 2735cd1f0e2f3aa62fa4ca7561cceeceed196ff9 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 19 Dec 2013 15:50:46 -0800 Subject: [PATCH 3/5] Print a log message on startup to more easily tell runs apart --- .../edu/berkeley/cs/amplab/adam/cli/AdamMain.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/AdamMain.scala b/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/AdamMain.scala index bc11a87361..1a16dd8fca 100644 --- a/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/AdamMain.scala +++ b/adam-cli/src/main/scala/edu/berkeley/cs/amplab/adam/cli/AdamMain.scala @@ -15,9 +15,10 @@ */ package edu.berkeley.cs.amplab.adam.cli +import org.apache.spark.Logging import scala.Some -object AdamMain { +object AdamMain extends Logging { private val commands = List(Bam2Adam, Transform, @@ -48,6 +49,7 @@ object AdamMain { } def main(args: Array[String]) { + log.info("ADAM invoked with args: %s".format(argsToString(args))) if (args.size < 1) { printCommands() } else { @@ -57,4 +59,11 @@ object AdamMain { } } } + + // Attempts to format the `args` array into a string in a way + // suitable for copying and pasting back into the shell. + private def argsToString(args: Array[String]): String = { + def escapeArg(s: String) = "\"" + s.replaceAll("\\\"", "\\\\\"") + "\"" + args.map(escapeArg).mkString(" ") + } } From 4b135f632fb37b46bccce24d58d118409b617144 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 8 Jan 2014 23:09:04 -0800 Subject: [PATCH 4/5] Improve BQSR performance by reducing GC pressure --- .../adam/rdd/RecalibrateBaseQualities.scala | 13 ++++++----- .../rdd/recalibration/ReadCovariates.scala | 7 +++--- .../adam/rdd/recalibration/RecalUtil.scala | 6 +++-- .../rdd/recalibration/StandardCovariate.scala | 22 ++++++++++--------- .../cs/amplab/adam/rich/RichADAMRecord.scala | 3 ++- .../rdd/RecalibrateBaseQualitiesSuite.scala | 4 +++- 6 files changed, 33 insertions(+), 22 deletions(-) diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala index 35ac2032ef..1eafb884e8 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala @@ -18,18 +18,21 @@ package edu.berkeley.cs.amplab.adam.rdd import org.apache.spark.Logging import org.apache.spark.broadcast.{Broadcast => SparkBroadcast} import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord._ import edu.berkeley.cs.amplab.adam.models.SnpTable import edu.berkeley.cs.amplab.adam.rdd.recalibration._ import org.apache.spark.rdd.RDD private[rdd] object RecalibrateBaseQualities extends Serializable with Logging { - def usableRead(read: ADAMRecord): Boolean = { + def usableRead(read: RichADAMRecord): Boolean = { // todo -- the mismatchingPositions should not merely be a filter, it should result in an exception. These are required for calculating mismatches. read.getReadMapped && read.getPrimaryAlignment && !read.getDuplicateRead && (read.getMismatchingPositions != null) } - def apply(rdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RDD[ADAMRecord] = { + def apply(poorRdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RDD[ADAMRecord] = { + val rdd = poorRdd.map(new RichADAMRecord(_)) // initialize the covariates println("Instantiating covariates...") val qualByRG = new QualByRG(rdd) @@ -46,7 +49,7 @@ private[rdd] object RecalibrateBaseQualities extends Serializable with Logging { private[rdd] class RecalibrateBaseQualities(val qualCovar: QualByRG, val covars: List[StandardCovariate]) extends Serializable with Logging { initLogging() - def computeTable(rdd: RDD[ADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RecalTable = { + def computeTable(rdd: RDD[RichADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RecalTable = { def addCovariates(table: RecalTable, covar: ReadCovariates): RecalTable = { //log.info("Aggregating covarates for read "+covar.read.record.getReadName.toString) @@ -61,9 +64,9 @@ private[rdd] class RecalibrateBaseQualities(val qualCovar: QualByRG, val covars: rdd.map(r => ReadCovariates(r, qualCovar, covars, dbsnp.value)).aggregate(new RecalTable)(addCovariates, mergeTables) } - def applyTable(table: RecalTable, rdd: RDD[ADAMRecord], qualCovar: QualByRG, covars: List[StandardCovariate]): RDD[ADAMRecord] = { + def applyTable(table: RecalTable, rdd: RDD[RichADAMRecord], qualCovar: QualByRG, covars: List[StandardCovariate]): RDD[ADAMRecord] = { table.finalizeTable() - def recalibrate(record: ADAMRecord): ADAMRecord = { + def recalibrate(record: RichADAMRecord): ADAMRecord = { if (!record.getReadMapped || !record.getPrimaryAlignment || record.getDuplicateRead) { record // no need to recalibrate these records todo -- enable optional recalibration of all reads } else { diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala index 0ecc4f4d0d..11561b31c9 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala @@ -16,17 +16,18 @@ package edu.berkeley.cs.amplab.adam.rdd.recalibration import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ -import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord._ import edu.berkeley.cs.amplab.adam.models.SnpTable object ReadCovariates { - def apply(rec: ADAMRecord, qualRG: QualByRG, covars: List[StandardCovariate], + def apply(rec: RichADAMRecord, qualRG: QualByRG, covars: List[StandardCovariate], dbsnp: SnpTable = SnpTable()): ReadCovariates = { new ReadCovariates(rec, qualRG, covars, dbsnp) } } -class ReadCovariates(val read: ADAMRecord, qualByRG: QualByRG, covars: List[StandardCovariate], +class ReadCovariates(val read: RichADAMRecord, qualByRG: QualByRG, covars: List[StandardCovariate], val dbSNP: SnpTable) extends Iterator[BaseCovariates] with Serializable { val startOffset = read.qualityScores.takeWhile(_ <= 2).size diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/RecalUtil.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/RecalUtil.scala index dae848a274..21eb21b6ce 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/RecalUtil.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/RecalUtil.scala @@ -16,6 +16,8 @@ package edu.berkeley.cs.amplab.adam.rdd.recalibration import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord._ object RecalUtil extends Serializable { @@ -29,7 +31,7 @@ object RecalUtil extends Serializable { def errorProbToQual(d: Double): Byte = (-10 * math.log10(d)).toInt.toByte - def recalibrate(read: ADAMRecord, qualByRG: QualByRG, covars: List[StandardCovariate], table: RecalTable): ADAMRecord = { + def recalibrate(read: RichADAMRecord, qualByRG: QualByRG, covars: List[StandardCovariate], table: RecalTable): ADAMRecord = { // get the covariates val readCovariates = ReadCovariates(read, qualByRG, covars) val toQual = errorProbToQual _ @@ -42,4 +44,4 @@ object RecalUtil extends Serializable { builder.build() } -} \ No newline at end of file +} diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala index 31c744f838..757b1a932f 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala @@ -17,15 +17,17 @@ package edu.berkeley.cs.amplab.adam.rdd.recalibration import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord._ import org.apache.spark.rdd.RDD // this class is required, not just standard. Baked in to recalibration. -class QualByRG(rdd: RDD[ADAMRecord]) extends Serializable { +class QualByRG(rdd: RDD[RichADAMRecord]) extends Serializable { // need to get the unique read groups todo --- this is surprisingly slow //val readGroups = rdd.map(_.getRecordGroupId.toString).distinct().collect().sorted.zipWithIndex.toMap var readGroups = Map[String, Int]() - def apply(read: ADAMRecord, start: Int, end: Int): Array[Int] = { + def apply(read: RichADAMRecord, start: Int, end: Int): Array[Int] = { if (!readGroups.contains(read.getRecordGroupId.asInstanceOf[String])) { readGroups += (read.getRecordGroupId.asInstanceOf[String] -> readGroups.size) } @@ -37,13 +39,13 @@ class QualByRG(rdd: RDD[ADAMRecord]) extends Serializable { } trait StandardCovariate extends Serializable { - def apply(read: ADAMRecord, start: Int, end: Int): Array[Int] // get the covariate for all the bases of the read + def apply(read: RichADAMRecord, start: Int, end: Int): Array[Int] // get the covariate for all the bases of the read } -case class DiscreteCycle(args: RDD[ADAMRecord]) extends StandardCovariate { +case class DiscreteCycle(args: RDD[RichADAMRecord]) extends StandardCovariate { // this is a special-case of the GATK's Cycle covariate for discrete technologies. // Not to be used for 454 or ion torrent (which are flow cycles) - def apply(read: ADAMRecord, startOffset: Int, endOffset: Int): Array[Int] = { + def apply(read: RichADAMRecord, startOffset: Int, endOffset: Int): Array[Int] = { var cycles: Array[Int] = if (read.getReadNegativeStrand) Range(read.getSequence.toString.size, 0, -1).toArray else Range(1, 1 + read.getSequence.toString.size, 1).toArray cycles = if (read.getReadPaired && read.getSecondOfPair) cycles.map(-_) else cycles @@ -51,17 +53,17 @@ case class DiscreteCycle(args: RDD[ADAMRecord]) extends StandardCovariate { } } -case class BaseContext(records: RDD[ADAMRecord], size: Int) extends StandardCovariate { +case class BaseContext(records: RDD[RichADAMRecord], size: Int) extends StandardCovariate { def this(_s: Int) = this(null, _s) - def this(_r: RDD[ADAMRecord]) = this(_r, 2) + def this(_r: RDD[RichADAMRecord]) = this(_r, 2) val BASES = Array('A'.toByte, 'C'.toByte, 'G'.toByte, 'T'.toByte) val COMPL = Array('T'.toByte, 'G'.toByte, 'C'.toByte, 'A'.toByte) val N_BASE = 'N'.toByte val COMPL_MP = (BASES zip COMPL toMap) + (N_BASE -> N_BASE) - def apply(read: ADAMRecord, startOffset: Int, endOffset: Int): Array[Int] = { + def apply(read: RichADAMRecord, startOffset: Int, endOffset: Int): Array[Int] = { // the context of a covariate is the previous @size bases, though "previous" depends on // how the read was aligned (negative strand is reverse-complemented). if (read.getReadNegativeStrand) reverseContext(read, startOffset, endOffset) else forwardContext(read, startOffset, endOffset) @@ -69,7 +71,7 @@ case class BaseContext(records: RDD[ADAMRecord], size: Int) extends StandardCova // note: the last base is dropped from the construction of contexts because it is not // present in any context - just as the first base cannot have a context assigned to it. - def forwardContext(rec: ADAMRecord, st: Int, end: Int): Array[Int] = { + def forwardContext(rec: RichADAMRecord, st: Int, end: Int): Array[Int] = { getContext(rec.getSequence.asInstanceOf[String].toCharArray.map(_.toByte).slice(st, end)) } @@ -77,7 +79,7 @@ case class BaseContext(records: RDD[ADAMRecord], size: Int) extends StandardCova bases.map(b => COMPL_MP(b)).reverseIterator.toArray } - def reverseContext(rec: ADAMRecord, st: Int, end: Int): Array[Int] = { + def reverseContext(rec: RichADAMRecord, st: Int, end: Int): Array[Int] = { // first reverse-complement the sequence val baseSeq = simpleReverseComplement(rec.getSequence.asInstanceOf[String].toCharArray.map(_.toByte)) getContext(baseSeq.slice(baseSeq.size - end, baseSeq.size - st)) diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala index 0d31b99dbc..1258ce2cef 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala @@ -29,11 +29,12 @@ object RichADAMRecord { } implicit def recordToRichRecord(record: ADAMRecord): RichADAMRecord = new RichADAMRecord(record) + implicit def richRecordToRecord(record: RichADAMRecord): ADAMRecord = record.record } class IlluminaOptics(val tile: Long, val x: Long, val y: Long) {} -class RichADAMRecord(record: ADAMRecord) { +class RichADAMRecord(val record: ADAMRecord) { // Returns the quality scores as a list of bytes lazy val qualityScores: Array[Byte] = record.getQual.toString.toCharArray.map(q => (q - 33).toByte) diff --git a/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala b/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala index da439538ba..09f14d5cf7 100644 --- a/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala +++ b/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala @@ -20,6 +20,8 @@ import edu.berkeley.cs.amplab.adam.rdd.recalibration._ import scala.collection.mutable import edu.berkeley.cs.amplab.adam.util.SparkFunSuite import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord +import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord._ class RecalibrateBaseQualitiesSuite extends SparkFunSuite { val RANDOM = new Random("RecalibrationSuite".hashCode) @@ -389,7 +391,7 @@ class RecalibrateBaseQualitiesSuite extends SparkFunSuite { val rec2 = ADAMRecord.newBuilder().setRecordGroupId(rg2).setQual(qualStr(qual2)).build() val rec3 = ADAMRecord.newBuilder().setRecordGroupId(rg3).setQual(qualStr(qual3)).build() val records = List(rec1, rec2, rec3) - val recRDD = sc.makeRDD(records, 1) + val recRDD = sc.makeRDD(records, 1).map(new RichADAMRecord(_)) System.out.println(recRDD.first()) val qualByRG = new QualByRG(recRDD) val intervals = List((0, 29), (6, 29), (0, 21), (0, 20)) From 24bcce2b9d0fe57fc5db9fa0fb1e726bed11b6f1 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Fri, 10 Jan 2014 16:44:35 -0800 Subject: [PATCH 5/5] Rename 'offset' to 'readOffset' and 'position' to 'referencePosition' --- .../cs/amplab/adam/models/SnpTable.scala | 4 ++-- .../rdd/recalibration/ReadCovariates.scala | 4 ++-- .../cs/amplab/adam/rich/RichADAMRecord.scala | 19 +++++++++---------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala index c8a7140841..91c456f96b 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/models/SnpTable.scala @@ -12,8 +12,8 @@ import java.io.File class SnpTable(private val table: Map[String, Set[Long]]) extends Serializable with Logging { log.info("SNP table has %s contigs and %s entries".format(table.size, table.values.map(_.size).sum)) - def isMaskedAtOffset(read: ADAMRecord, offset: Int): Boolean = { - val position = read.offsetToPosition(offset) + def isMaskedAtReadOffset(read: ADAMRecord, offset: Int): Boolean = { + val position = read.readOffsetToReferencePosition(offset) try { position.isEmpty || table(read.getReferenceName.toString).contains(position.get) } catch { diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala index 11561b31c9..8e015d3b6d 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala @@ -41,9 +41,9 @@ class ReadCovariates(val read: RichADAMRecord, qualByRG: QualByRG, covars: List[ override def next(): BaseCovariates = { val offset = (iter_position - startOffset).toInt - val mismatch = read.isMismatchAtOffset(offset) + val mismatch = read.isMismatchAtReadOffset(offset) // FIXME: why does empty mismatch mean it should be masked? - val isMasked = dbSNP.isMaskedAtOffset(read, offset) || mismatch.isEmpty + val isMasked = dbSNP.isMaskedAtReadOffset(read, offset) || mismatch.isEmpty // getOrElse because reads without an MD tag can appear during *application* of recal table val isMismatch = mismatch.getOrElse(false) iter_position += 1 diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala index 1258ce2cef..9abd689eb3 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rich/RichADAMRecord.scala @@ -110,8 +110,8 @@ class RichADAMRecord(val record: ADAMRecord) { } } - // does this read overlap with the given position? - def overlapsPosition(pos: Long): Option[Boolean] = { + // Does this read overlap with the given reference position? + def overlapsReferencePosition(pos: Long): Option[Boolean] = { if (record.getReadMapped) { Some(record.getStart <= pos && pos < end.get) } else { @@ -119,22 +119,22 @@ class RichADAMRecord(val record: ADAMRecord) { } } - // does this read mismatch the reference at the given *reference* position? - def isMismatchAtPosition(pos: Long): Option[Boolean] = { - if (mdEvent.isEmpty || !overlapsPosition(pos).get) { + // Does this read mismatch the reference at the given reference position? + def isMismatchAtReferencePosition(pos: Long): Option[Boolean] = { + if (mdEvent.isEmpty || !overlapsReferencePosition(pos).get) { None } else { Some(!mdEvent.get.isMatch(pos)) } } - // does this read mismatch the reference at the given offset within the read? - def isMismatchAtOffset(offset: Int): Option[Boolean] = { + // Does this read mismatch the reference at the given offset within the read? + def isMismatchAtReadOffset(offset: Int): Option[Boolean] = { // careful about offsets that are within an insertion! if (referencePositions.isEmpty) { None } else { - offsetToPosition(offset).flatMap(isMismatchAtPosition) + readOffsetToReferencePosition(offset).flatMap(isMismatchAtReferencePosition) } } @@ -171,8 +171,7 @@ class RichADAMRecord(val record: ADAMRecord) { } } - // get the reference position of an offset within the read - def offsetToPosition(offset: Int): Option[Long] = { + def readOffsetToReferencePosition(offset: Int): Option[Long] = { if (record.getReadMapped) { referencePositions(offset) } else {