From 0f3dca333372bfc940cf3af84a455074afe174b6 Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Mon, 19 Feb 2018 08:49:41 -0500 Subject: [PATCH 1/4] posAlleleStats working --- .../avocado/genotyping/posAlleleStats.scala | 201 ++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala new file mode 100644 index 00000000..0352752c --- /dev/null +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala @@ -0,0 +1,201 @@ +package org.bdgenomics.avocado.genotyping + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.bdgenomics.adam.models.ReferencePosition +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.avocado.Timers._ +import org.bdgenomics.avocado.models.{ Clipped, Deletion, Insertion, Match, ObservationOperator } +import org.bdgenomics.formats.avro.{ AlignmentRecord, Variant } +import org.bdgenomics.utils.misc.Logging + +import scala.annotation.tailrec +import scala.collection.mutable + +/** + * Discovers the variants present in a set of aligned reads. + * + * Useful for force-calling variants. + */ +object PosAlleleStats extends Serializable with Logging { + + case class AlleleCounts(var A: Int, var C: Int, var G: Int, var T: Int) + + def posAlleleStatsMapPartitions(in: Iterator[AlignmentRecord]): Iterator[(ReferencePosition, AlleleCounts)] = { + + // val x: mutable.Map[ReferencePosition, mutable.HashMap[Char,Int]] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, mutable.HashMap[Char,Int]] + val x: mutable.Map[ReferencePosition, AlleleCounts] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, AlleleCounts] + + in.foreach(read => { + if (!read.getReadMapped) { + Iterable.empty + } else { + // extract the alignment blocks from the read + val ops = try { + ObservationOperator.extractAlignmentOperators(read) + } catch { + case t: Throwable => { + log.warn("Extracting alignment operators from %s failed with %s.".format( + read.getReadName, t)) + Iterable.empty + } + } + + // where are we on the reference and in the read? + var pos = read.getStart.toInt + var idx = 0 + + // get the read sequence, contig, etc + val sequence = read.getSequence + val qual = read.getQual + val contigName = read.getContigName + + // advance to the first alignment match + @tailrec def fastForward( + iter: BufferedIterator[ObservationOperator]): Iterator[ObservationOperator] = { + + if (!iter.hasNext) { + Iterator() + } else { + val stop = iter.head match { + case Clipped(_, false) => false + case Clipped(length, true) => { + idx += length + false + } + case Insertion(length) => { + idx += length + false + } + case Deletion(ref) => { + pos += ref.length + false + } + case Match(_, _) => { + true + } + } + + if (stop) { + iter.toIterator + } else { + // pop from the iterator and recurse + iter.next + fastForward(iter) + } + } + } + + val opsIter = fastForward(ops.toIterator.buffered) + + def incrementBaseCount(i: Char, pos: ReferencePosition): Unit = { + + if (!x.isDefinedAt(pos)) { + x(pos) = AlleleCounts(0, 0, 0, 0) + } + + if (i.toUpper == 'A') { x(pos).A += 1 } + else if (i.toUpper == 'C') { x(pos).C += 1 } + else if (i.toUpper == 'C') { x(pos).G += 1 } + else if (i.toUpper == 'T') { x(pos).G += 1 } + } + + @tailrec def recordAlleles(iter: Iterator[ObservationOperator], + lastRef: String = "", + variants: List[DiscoveredVariant] = List.empty, + phredThreshold: Int = 30): Unit = { + + if (!iter.hasNext) { + variants.toIterable + } else { + + // pop from the iterator + val obs = iter.next + + // update the list and position and advance + val (nextRef, nextVariants) = obs match { + case Clipped(_, _) => { + (lastRef, variants) + } + case Match(length, optRef) => { + val kv = optRef.fold({ + (0 until length).foreach(i => if (qual(i).toInt - 33 >= phredThreshold) { + + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) + }) + + //x(ReferencePosition(contigName,pos+1))=(1,1,1,1) }) + /* + (0 until length).flatMap( i => if (qual(i).toInt - 33 >= phredThreshold) { + x(ReferencePosition(contigName,pos+1))=(1,1,1,1) + None } else { + None} ) + */ + + (sequence(idx + length - 1).toString, variants) + })((ref: String) => { + val newVars = (0 until length).flatMap(i => { + if (qual(i).toInt - 33 >= phredThreshold) { + ///x(ReferencePosition(contigName,pos+i)) = (1,1,1,1) + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) + Some(DiscoveredVariant( + contigName, + pos + i, + ref(i).toString, + sequence(idx + i).toString)) + } else { + None + } + }).toList ::: variants + (ref.last.toString, newVars) + }) + pos += length + idx += length + kv + } + case Insertion(length) => { + val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length + val newVar = if (insQuals >= phredThreshold) { + DiscoveredVariant( + contigName, + pos - 1, + lastRef, + sequence.substring(idx - 1, idx + length)) :: variants + } else { + variants + } + idx += length + (lastRef, newVar) + } + case Deletion(ref) => { + val delLength = ref.size + val newVar = if (qual(idx - 1).toInt - 33 >= phredThreshold) { + DiscoveredVariant( + contigName, + pos - 1, + lastRef + ref, + sequence.substring(idx - 1, idx)) :: variants + } else { + variants + } + pos += delLength + (ref.last.toString, newVar) + } + } + + // recurse + recordAlleles(iter, nextRef, nextVariants) + } + } + + recordAlleles(opsIter) + } + + }) //foreach read + + //val z: Iterator[(ReferencePosition, AlleleCounts)] = x.toIterator + x.toIterator + } + +} \ No newline at end of file From 948b70e317191ff777db651613822f9026896d12 Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Mon, 19 Feb 2018 09:54:57 -0500 Subject: [PATCH 2/4] fix alleles - working --- .../avocado/genotyping/posAlleleStats.scala | 301 +++++++++--------- 1 file changed, 157 insertions(+), 144 deletions(-) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala index 0352752c..5ede82c7 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala @@ -22,180 +22,193 @@ object PosAlleleStats extends Serializable with Logging { case class AlleleCounts(var A: Int, var C: Int, var G: Int, var T: Int) - def posAlleleStatsMapPartitions(in: Iterator[AlignmentRecord]): Iterator[(ReferencePosition, AlleleCounts)] = { - - // val x: mutable.Map[ReferencePosition, mutable.HashMap[Char,Int]] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, mutable.HashMap[Char,Int]] - val x: mutable.Map[ReferencePosition, AlleleCounts] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, AlleleCounts] - - in.foreach(read => { - if (!read.getReadMapped) { - Iterable.empty - } else { - // extract the alignment blocks from the read - val ops = try { - ObservationOperator.extractAlignmentOperators(read) - } catch { - case t: Throwable => { - log.warn("Extracting alignment operators from %s failed with %s.".format( - read.getReadName, t)) - Iterable.empty + def runPosAlleleStats(alignmentRecordRDD: AlignmentRecordRDD, phredThreshold: Int = 33): RDD[(org.bdgenomics.adam.models.ReferencePosition, org.bdgenomics.avocado.genotyping.PosAlleleStats.AlleleCounts)] = { + + //alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions) + + def posAlleleStatsMapPartitions(in: Iterator[AlignmentRecord]): Iterator[(ReferencePosition, AlleleCounts)] = { + + // val x: mutable.Map[ReferencePosition, mutable.HashMap[Char,Int]] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, mutable.HashMap[Char,Int]] + val x: mutable.Map[ReferencePosition, AlleleCounts] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, AlleleCounts] + + in.foreach(read => { + if (!read.getReadMapped) { + Iterable.empty + } else { + // extract the alignment blocks from the read + val ops = try { + ObservationOperator.extractAlignmentOperators(read) + } catch { + case t: Throwable => { + log.warn("Extracting alignment operators from %s failed with %s.".format( + read.getReadName, t)) + Iterable.empty + } } - } - // where are we on the reference and in the read? - var pos = read.getStart.toInt - var idx = 0 - - // get the read sequence, contig, etc - val sequence = read.getSequence - val qual = read.getQual - val contigName = read.getContigName - - // advance to the first alignment match - @tailrec def fastForward( - iter: BufferedIterator[ObservationOperator]): Iterator[ObservationOperator] = { - - if (!iter.hasNext) { - Iterator() - } else { - val stop = iter.head match { - case Clipped(_, false) => false - case Clipped(length, true) => { - idx += length - false - } - case Insertion(length) => { - idx += length - false - } - case Deletion(ref) => { - pos += ref.length - false - } - case Match(_, _) => { - true - } - } + // where are we on the reference and in the read? + var pos = read.getStart.toInt + var idx = 0 + + // get the read sequence, contig, etc + val sequence = read.getSequence + val qual = read.getQual + val contigName = read.getContigName - if (stop) { - iter.toIterator + // advance to the first alignment match + @tailrec def fastForward( + iter: BufferedIterator[ObservationOperator]): Iterator[ObservationOperator] = { + + if (!iter.hasNext) { + Iterator() } else { - // pop from the iterator and recurse - iter.next - fastForward(iter) + val stop = iter.head match { + case Clipped(_, false) => false + case Clipped(length, true) => { + idx += length + false + } + case Insertion(length) => { + idx += length + false + } + case Deletion(ref) => { + pos += ref.length + false + } + case Match(_, _) => { + true + } + } + + if (stop) { + iter.toIterator + } else { + // pop from the iterator and recurse + iter.next + fastForward(iter) + } } } - } - val opsIter = fastForward(ops.toIterator.buffered) + val opsIter = fastForward(ops.toIterator.buffered) - def incrementBaseCount(i: Char, pos: ReferencePosition): Unit = { + def incrementBaseCount(i: Char, pos: ReferencePosition): Unit = { - if (!x.isDefinedAt(pos)) { - x(pos) = AlleleCounts(0, 0, 0, 0) - } + if (!x.isDefinedAt(pos)) { + x(pos) = AlleleCounts(0, 0, 0, 0) + } - if (i.toUpper == 'A') { x(pos).A += 1 } - else if (i.toUpper == 'C') { x(pos).C += 1 } - else if (i.toUpper == 'C') { x(pos).G += 1 } - else if (i.toUpper == 'T') { x(pos).G += 1 } - } + if (i.toUpper == 'A') { + x(pos).A += 1 + } else if (i.toUpper == 'C') { + x(pos).C += 1 + } else if (i.toUpper == 'G') { + x(pos).G += 1 + } else if (i.toUpper == 'T') { + x(pos).T += 1 + } + } - @tailrec def recordAlleles(iter: Iterator[ObservationOperator], - lastRef: String = "", - variants: List[DiscoveredVariant] = List.empty, - phredThreshold: Int = 30): Unit = { + @tailrec def recordAlleles(iter: Iterator[ObservationOperator], + lastRef: String = "", + variants: List[DiscoveredVariant] = List.empty /*phredThreshold: Int = 30 */ ): Unit = { - if (!iter.hasNext) { - variants.toIterable - } else { + if (!iter.hasNext) { + variants.toIterable + } else { - // pop from the iterator - val obs = iter.next + // pop from the iterator + val obs = iter.next - // update the list and position and advance - val (nextRef, nextVariants) = obs match { - case Clipped(_, _) => { - (lastRef, variants) - } - case Match(length, optRef) => { - val kv = optRef.fold({ - (0 until length).foreach(i => if (qual(i).toInt - 33 >= phredThreshold) { + // update the list and position and advance + val (nextRef, nextVariants) = obs match { + case Clipped(_, _) => { + (lastRef, variants) + } + case Match(length, optRef) => { + val kv = optRef.fold({ + (0 until length).foreach(i => if (qual(i).toInt - 33 >= phredThreshold) { - incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) - }) + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) + }) - //x(ReferencePosition(contigName,pos+1))=(1,1,1,1) }) - /* + //x(ReferencePosition(contigName,pos+1))=(1,1,1,1) }) + /* (0 until length).flatMap( i => if (qual(i).toInt - 33 >= phredThreshold) { x(ReferencePosition(contigName,pos+1))=(1,1,1,1) None } else { None} ) */ - (sequence(idx + length - 1).toString, variants) - })((ref: String) => { - val newVars = (0 until length).flatMap(i => { - if (qual(i).toInt - 33 >= phredThreshold) { - ///x(ReferencePosition(contigName,pos+i)) = (1,1,1,1) - incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) - Some(DiscoveredVariant( - contigName, - pos + i, - ref(i).toString, - sequence(idx + i).toString)) - } else { - None - } - }).toList ::: variants - (ref.last.toString, newVars) - }) - pos += length - idx += length - kv - } - case Insertion(length) => { - val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length - val newVar = if (insQuals >= phredThreshold) { - DiscoveredVariant( - contigName, - pos - 1, - lastRef, - sequence.substring(idx - 1, idx + length)) :: variants - } else { - variants + (sequence(idx + length - 1).toString, variants) + })((ref: String) => { + val newVars = (0 until length).flatMap(i => { + if (qual(i).toInt - 33 >= phredThreshold) { + ///x(ReferencePosition(contigName,pos+i)) = (1,1,1,1) + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) + Some(DiscoveredVariant( + contigName, + pos + i, + ref(i).toString, + sequence(idx + i).toString)) + } else { + None + } + }).toList ::: variants + (ref.last.toString, newVars) + }) + pos += length + idx += length + kv } - idx += length - (lastRef, newVar) - } - case Deletion(ref) => { - val delLength = ref.size - val newVar = if (qual(idx - 1).toInt - 33 >= phredThreshold) { - DiscoveredVariant( - contigName, - pos - 1, - lastRef + ref, - sequence.substring(idx - 1, idx)) :: variants - } else { - variants + case Insertion(length) => { + val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length + val newVar = if (insQuals >= phredThreshold) { + DiscoveredVariant( + contigName, + pos - 1, + lastRef, + sequence.substring(idx - 1, idx + length)) :: variants + } else { + variants + } + idx += length + (lastRef, newVar) + } + case Deletion(ref) => { + val delLength = ref.size + val newVar = if (qual(idx - 1).toInt - 33 >= phredThreshold) { + DiscoveredVariant( + contigName, + pos - 1, + lastRef + ref, + sequence.substring(idx - 1, idx)) :: variants + } else { + variants + } + pos += delLength + (ref.last.toString, newVar) } - pos += delLength - (ref.last.toString, newVar) } - } - // recurse - recordAlleles(iter, nextRef, nextVariants) + // recurse + recordAlleles(iter, nextRef, nextVariants) + } } + + recordAlleles(opsIter) } - recordAlleles(opsIter) - } + }) //foreach read - }) //foreach read + //val z: Iterator[(ReferencePosition, AlleleCounts)] = x.toIterator + x.toIterator + } + + alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions) - //val z: Iterator[(ReferencePosition, AlleleCounts)] = x.toIterator - x.toIterator } -} \ No newline at end of file +} + From bab66958eff63cbf8423ad4a98455f33ddae096c Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Mon, 19 Feb 2018 20:14:13 -0500 Subject: [PATCH 3/4] phred filter working --- .../avocado/genotyping/posAlleleStats.scala | 58 +++++++++---------- .../org/bdgenomics/avocado/genotyping/test5 | 2 + 2 files changed, 31 insertions(+), 29 deletions(-) create mode 100644 avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala index 5ede82c7..419725cc 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala @@ -20,7 +20,7 @@ import scala.collection.mutable */ object PosAlleleStats extends Serializable with Logging { - case class AlleleCounts(var A: Int, var C: Int, var G: Int, var T: Int) + case class AlleleCounts(var A: Int, var C: Int, var G: Int, var T: Int, var N: Int, var ins: Int, var del: Int) def runPosAlleleStats(alignmentRecordRDD: AlignmentRecordRDD, phredThreshold: Int = 33): RDD[(org.bdgenomics.adam.models.ReferencePosition, org.bdgenomics.avocado.genotyping.PosAlleleStats.AlleleCounts)] = { @@ -51,8 +51,12 @@ object PosAlleleStats extends Serializable with Logging { var idx = 0 // get the read sequence, contig, etc - val sequence = read.getSequence - val qual = read.getQual + val sequence: String = read.getSequence + val qual: String = read.getQual + + println(sequence) + println(qual) + val contigName = read.getContigName // advance to the first alignment match @@ -93,26 +97,30 @@ object PosAlleleStats extends Serializable with Logging { val opsIter = fastForward(ops.toIterator.buffered) - def incrementBaseCount(i: Char, pos: ReferencePosition): Unit = { + def incrementBaseCount(i: Char, pos: ReferencePosition, qual: Int): Unit = { if (!x.isDefinedAt(pos)) { - x(pos) = AlleleCounts(0, 0, 0, 0) + x(pos) = AlleleCounts(0, 0, 0, 0, 0, 0, 0) } - if (i.toUpper == 'A') { - x(pos).A += 1 - } else if (i.toUpper == 'C') { - x(pos).C += 1 - } else if (i.toUpper == 'G') { - x(pos).G += 1 - } else if (i.toUpper == 'T') { - x(pos).T += 1 + if (qual >= phredThreshold) { + if (i.toUpper == 'A') { + x(pos).A += 1 + } else if (i.toUpper == 'C') { + x(pos).C += 1 + } else if (i.toUpper == 'G') { + x(pos).G += 1 + } else if (i.toUpper == 'T') { + x(pos).T += 1 + } + } else { + x(pos).N += 1 } } @tailrec def recordAlleles(iter: Iterator[ObservationOperator], lastRef: String = "", - variants: List[DiscoveredVariant] = List.empty /*phredThreshold: Int = 30 */ ): Unit = { + variants: List[DiscoveredVariant] = List.empty): Unit = { if (!iter.hasNext) { variants.toIterable @@ -128,25 +136,17 @@ object PosAlleleStats extends Serializable with Logging { } case Match(length, optRef) => { val kv = optRef.fold({ - (0 until length).foreach(i => if (qual(i).toInt - 33 >= phredThreshold) { - - incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) + (0 until length).foreach(i => if (true /*qual(idx + i).toInt - 33 >= phredThreshold */ ) { + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) + println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) }) - - //x(ReferencePosition(contigName,pos+1))=(1,1,1,1) }) - /* - (0 until length).flatMap( i => if (qual(i).toInt - 33 >= phredThreshold) { - x(ReferencePosition(contigName,pos+1))=(1,1,1,1) - None } else { - None} ) - */ - (sequence(idx + length - 1).toString, variants) })((ref: String) => { val newVars = (0 until length).flatMap(i => { - if (qual(i).toInt - 33 >= phredThreshold) { - ///x(ReferencePosition(contigName,pos+i)) = (1,1,1,1) - incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i)) + if (true /*qual(idx + i).toInt - 33 >= phredThreshold*/ ) { + println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) + + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) Some(DiscoveredVariant( contigName, pos + i, diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 new file mode 100644 index 00000000..b578f537 --- /dev/null +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 @@ -0,0 +1,2 @@ +TTAACTTAAAATGGGAAGAGAGCAAAACATTCCACAGGCAAAGTCAGGGGACTATTTTAATGACAGAGACACATGGTTGAAATCAATGCTGCACTCTGAGCCCTTAGAAACCGTCGCAAAGGACTCCACGGAAGGGTGGGGGCAGCCCCAGCGAGGTCGTCAGGGAGCAG G GCCCGAGTCATGGGACACAAGGCCGGACCCCTCACCGCCAGGGAAGCGGAGTCACAGGCACGTGGCCCTCTGCGCCCAG +############@@?CC@@@A=ABB@<@;,9A@?@B>?ABA@;A?AB>?8@@@AA==9@A??@@A?=>=>?>:>?A;@?A@A>;B=<>@=/A6?A7==AAA??=?AA@@7@?AA@@@?AA@A==?BA7?A@?B5A>???<@@A > 6AA8?B=A?@A@@??@?@A>;?@5@<7A?>>:@?@6?@@@?@?A@?7@@@>A@@@@@@@@7?CAAA:ABABA7@CCA@? \ No newline at end of file From d6137af9ec741a2671f78f0e269ad5cb315ddf87 Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Mon, 19 Feb 2018 21:09:04 -0500 Subject: [PATCH 4/4] added reduceByKey to merge partitions --- .../avocado/genotyping/posAlleleStats.scala | 84 +++++++------------ .../org/bdgenomics/avocado/genotyping/test5 | 2 - 2 files changed, 29 insertions(+), 57 deletions(-) delete mode 100644 avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala index 419725cc..39c926ba 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala @@ -1,5 +1,7 @@ package org.bdgenomics.avocado.genotyping +import java.io + import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.bdgenomics.adam.models.ReferencePosition @@ -14,9 +16,7 @@ import scala.annotation.tailrec import scala.collection.mutable /** - * Discovers the variants present in a set of aligned reads. - * - * Useful for force-calling variants. + * Counts Bases (A C G T) as well as ins / del at each position covered by at least one read in the input alignment file */ object PosAlleleStats extends Serializable with Logging { @@ -24,11 +24,9 @@ object PosAlleleStats extends Serializable with Logging { def runPosAlleleStats(alignmentRecordRDD: AlignmentRecordRDD, phredThreshold: Int = 33): RDD[(org.bdgenomics.adam.models.ReferencePosition, org.bdgenomics.avocado.genotyping.PosAlleleStats.AlleleCounts)] = { - //alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions) - def posAlleleStatsMapPartitions(in: Iterator[AlignmentRecord]): Iterator[(ReferencePosition, AlleleCounts)] = { - // val x: mutable.Map[ReferencePosition, mutable.HashMap[Char,Int]] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, mutable.HashMap[Char,Int]] + // hash of allele counters per genomics position val x: mutable.Map[ReferencePosition, AlleleCounts] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, AlleleCounts] in.foreach(read => { @@ -119,44 +117,31 @@ object PosAlleleStats extends Serializable with Logging { } @tailrec def recordAlleles(iter: Iterator[ObservationOperator], - lastRef: String = "", - variants: List[DiscoveredVariant] = List.empty): Unit = { + lastRef: String = ""): Unit = { if (!iter.hasNext) { - variants.toIterable + None } else { - // pop from the iterator val obs = iter.next - // update the list and position and advance - val (nextRef, nextVariants) = obs match { + val (nextRef) = obs match { case Clipped(_, _) => { - (lastRef, variants) + (lastRef) } case Match(length, optRef) => { val kv = optRef.fold({ - (0 until length).foreach(i => if (true /*qual(idx + i).toInt - 33 >= phredThreshold */ ) { + (0 until length).foreach(i => { incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) - println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) + //println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) }) - (sequence(idx + length - 1).toString, variants) + sequence(idx + length - 1).toString })((ref: String) => { - val newVars = (0 until length).flatMap(i => { - if (true /*qual(idx + i).toInt - 33 >= phredThreshold*/ ) { - println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) - - incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) - Some(DiscoveredVariant( - contigName, - pos + i, - ref(i).toString, - sequence(idx + i).toString)) - } else { - None - } - }).toList ::: variants - (ref.last.toString, newVars) + (0 until length).foreach(i => { + //println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) + }) + ref.last.toString }) pos += length idx += length @@ -164,50 +149,39 @@ object PosAlleleStats extends Serializable with Logging { } case Insertion(length) => { val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length - val newVar = if (insQuals >= phredThreshold) { - DiscoveredVariant( - contigName, - pos - 1, - lastRef, - sequence.substring(idx - 1, idx + length)) :: variants - } else { - variants - } + if (insQuals >= phredThreshold) { x(ReferencePosition(contigName, pos - 1)).ins += 1 } idx += length - (lastRef, newVar) + (lastRef) } case Deletion(ref) => { val delLength = ref.size - val newVar = if (qual(idx - 1).toInt - 33 >= phredThreshold) { - DiscoveredVariant( - contigName, - pos - 1, - lastRef + ref, - sequence.substring(idx - 1, idx)) :: variants - } else { - variants - } + if (qual(idx - 1).toInt - 33 >= phredThreshold) { x(ReferencePosition(contigName, pos)).del += 1 } pos += delLength - (ref.last.toString, newVar) + (ref.last.toString) } } // recurse - recordAlleles(iter, nextRef, nextVariants) + recordAlleles(iter, nextRef) } } recordAlleles(opsIter) } - }) //foreach read + }) - //val z: Iterator[(ReferencePosition, AlleleCounts)] = x.toIterator x.toIterator } alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions) - + .reduceByKey((x, y) => AlleleCounts(x.A + y.A, + x.C + y.C, + x.G + y.G, + x.T + y.T, + x.N + y.N, + x.ins + y.ins, + x.del + y.del)) } } diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 deleted file mode 100644 index b578f537..00000000 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 +++ /dev/null @@ -1,2 +0,0 @@ -TTAACTTAAAATGGGAAGAGAGCAAAACATTCCACAGGCAAAGTCAGGGGACTATTTTAATGACAGAGACACATGGTTGAAATCAATGCTGCACTCTGAGCCCTTAGAAACCGTCGCAAAGGACTCCACGGAAGGGTGGGGGCAGCCCCAGCGAGGTCGTCAGGGAGCAG G GCCCGAGTCATGGGACACAAGGCCGGACCCCTCACCGCCAGGGAAGCGGAGTCACAGGCACGTGGCCCTCTGCGCCCAG -############@@?CC@@@A=ABB@<@;,9A@?@B>?ABA@;A?AB>?8@@@AA==9@A??@@A?=>=>?>:>?A;@?A@A>;B=<>@=/A6?A7==AAA??=?AA@@7@?AA@@@?AA@A==?BA7?A@?B5A>???<@@A > 6AA8?B=A?@A@@??@?@A>;?@5@<7A?>>:@?@6?@@@?@?A@?7@@@>A@@@@@@@@7?CAAA:ABABA7@CCA@? \ No newline at end of file