From d6137af9ec741a2671f78f0e269ad5cb315ddf87 Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Mon, 19 Feb 2018 21:09:04 -0500 Subject: [PATCH] added reduceByKey to merge partitions --- .../avocado/genotyping/posAlleleStats.scala | 84 +++++++------------ .../org/bdgenomics/avocado/genotyping/test5 | 2 - 2 files changed, 29 insertions(+), 57 deletions(-) delete mode 100644 avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala index 419725cc..39c926ba 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/posAlleleStats.scala @@ -1,5 +1,7 @@ package org.bdgenomics.avocado.genotyping +import java.io + import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.bdgenomics.adam.models.ReferencePosition @@ -14,9 +16,7 @@ import scala.annotation.tailrec import scala.collection.mutable /** - * Discovers the variants present in a set of aligned reads. - * - * Useful for force-calling variants. + * Counts Bases (A C G T) as well as ins / del at each position covered by at least one read in the input alignment file */ object PosAlleleStats extends Serializable with Logging { @@ -24,11 +24,9 @@ object PosAlleleStats extends Serializable with Logging { def runPosAlleleStats(alignmentRecordRDD: AlignmentRecordRDD, phredThreshold: Int = 33): RDD[(org.bdgenomics.adam.models.ReferencePosition, org.bdgenomics.avocado.genotyping.PosAlleleStats.AlleleCounts)] = { - //alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions) - def posAlleleStatsMapPartitions(in: Iterator[AlignmentRecord]): Iterator[(ReferencePosition, AlleleCounts)] = { - // val x: mutable.Map[ReferencePosition, mutable.HashMap[Char,Int]] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, mutable.HashMap[Char,Int]] + // hash of allele counters per genomics position val x: mutable.Map[ReferencePosition, AlleleCounts] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, AlleleCounts] in.foreach(read => { @@ -119,44 +117,31 @@ object PosAlleleStats extends Serializable with Logging { } @tailrec def recordAlleles(iter: Iterator[ObservationOperator], - lastRef: String = "", - variants: List[DiscoveredVariant] = List.empty): Unit = { + lastRef: String = ""): Unit = { if (!iter.hasNext) { - variants.toIterable + None } else { - // pop from the iterator val obs = iter.next - // update the list and position and advance - val (nextRef, nextVariants) = obs match { + val (nextRef) = obs match { case Clipped(_, _) => { - (lastRef, variants) + (lastRef) } case Match(length, optRef) => { val kv = optRef.fold({ - (0 until length).foreach(i => if (true /*qual(idx + i).toInt - 33 >= phredThreshold */ ) { + (0 until length).foreach(i => { incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) - println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) + //println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) }) - (sequence(idx + length - 1).toString, variants) + sequence(idx + length - 1).toString })((ref: String) => { - val newVars = (0 until length).flatMap(i => { - if (true /*qual(idx + i).toInt - 33 >= phredThreshold*/ ) { - println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) - - incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) - Some(DiscoveredVariant( - contigName, - pos + i, - ref(i).toString, - sequence(idx + i).toString)) - } else { - None - } - }).toList ::: variants - (ref.last.toString, newVars) + (0 until length).foreach(i => { + //println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33)) + incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33) + }) + ref.last.toString }) pos += length idx += length @@ -164,50 +149,39 @@ object PosAlleleStats extends Serializable with Logging { } case Insertion(length) => { val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length - val newVar = if (insQuals >= phredThreshold) { - DiscoveredVariant( - contigName, - pos - 1, - lastRef, - sequence.substring(idx - 1, idx + length)) :: variants - } else { - variants - } + if (insQuals >= phredThreshold) { x(ReferencePosition(contigName, pos - 1)).ins += 1 } idx += length - (lastRef, newVar) + (lastRef) } case Deletion(ref) => { val delLength = ref.size - val newVar = if (qual(idx - 1).toInt - 33 >= phredThreshold) { - DiscoveredVariant( - contigName, - pos - 1, - lastRef + ref, - sequence.substring(idx - 1, idx)) :: variants - } else { - variants - } + if (qual(idx - 1).toInt - 33 >= phredThreshold) { x(ReferencePosition(contigName, pos)).del += 1 } pos += delLength - (ref.last.toString, newVar) + (ref.last.toString) } } // recurse - recordAlleles(iter, nextRef, nextVariants) + recordAlleles(iter, nextRef) } } recordAlleles(opsIter) } - }) //foreach read + }) - //val z: Iterator[(ReferencePosition, AlleleCounts)] = x.toIterator x.toIterator } alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions) - + .reduceByKey((x, y) => AlleleCounts(x.A + y.A, + x.C + y.C, + x.G + y.G, + x.T + y.T, + x.N + y.N, + x.ins + y.ins, + x.del + y.del)) } } diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 deleted file mode 100644 index b578f537..00000000 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/test5 +++ /dev/null @@ -1,2 +0,0 @@ -TTAACTTAAAATGGGAAGAGAGCAAAACATTCCACAGGCAAAGTCAGGGGACTATTTTAATGACAGAGACACATGGTTGAAATCAATGCTGCACTCTGAGCCCTTAGAAACCGTCGCAAAGGACTCCACGGAAGGGTGGGGGCAGCCCCAGCGAGGTCGTCAGGGAGCAG G GCCCGAGTCATGGGACACAAGGCCGGACCCCTCACCGCCAGGGAAGCGGAGTCACAGGCACGTGGCCCTCTGCGCCCAG -############@@?CC@@@A=ABB@<@;,9A@?@B>?ABA@;A?AB>?8@@@AA==9@A??@@A?=>=>?>:>?A;@?A@A>;B=<>@=/A6?A7==AAA??=?AA@@7@?AA@@@?AA@A==?BA7?A@?B5A>???<@@A > 6AA8?B=A?@A@@??@?@A>;?@5@<7A?>>:@?@6?@@@?@?A@?7@@@>A@@@@@@@@7?CAAA:ABABA7@CCA@? \ No newline at end of file