Skip to content

Commit

Permalink
added reduceByKey to merge partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jpdna committed Feb 20, 2018
1 parent bab6695 commit d6137af
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.bdgenomics.avocado.genotyping

import java.io

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.bdgenomics.adam.models.ReferencePosition
Expand All @@ -14,21 +16,17 @@ import scala.annotation.tailrec
import scala.collection.mutable

/**
* Discovers the variants present in a set of aligned reads.
*
* Useful for force-calling variants.
* Counts Bases (A C G T) as well as ins / del at each position covered by at least one read in the input alignment file
*/
object PosAlleleStats extends Serializable with Logging {

case class AlleleCounts(var A: Int, var C: Int, var G: Int, var T: Int, var N: Int, var ins: Int, var del: Int)

def runPosAlleleStats(alignmentRecordRDD: AlignmentRecordRDD, phredThreshold: Int = 33): RDD[(org.bdgenomics.adam.models.ReferencePosition, org.bdgenomics.avocado.genotyping.PosAlleleStats.AlleleCounts)] = {

//alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions)

def posAlleleStatsMapPartitions(in: Iterator[AlignmentRecord]): Iterator[(ReferencePosition, AlleleCounts)] = {

// val x: mutable.Map[ReferencePosition, mutable.HashMap[Char,Int]] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, mutable.HashMap[Char,Int]]
// hash of allele counters per genomics position
val x: mutable.Map[ReferencePosition, AlleleCounts] = new scala.collection.mutable.HashMap[org.bdgenomics.adam.models.ReferencePosition, AlleleCounts]

in.foreach(read => {
Expand Down Expand Up @@ -119,95 +117,71 @@ object PosAlleleStats extends Serializable with Logging {
}

@tailrec def recordAlleles(iter: Iterator[ObservationOperator],
lastRef: String = "",
variants: List[DiscoveredVariant] = List.empty): Unit = {
lastRef: String = ""): Unit = {

if (!iter.hasNext) {
variants.toIterable
None
} else {

// pop from the iterator
val obs = iter.next

// update the list and position and advance
val (nextRef, nextVariants) = obs match {
val (nextRef) = obs match {
case Clipped(_, _) => {
(lastRef, variants)
(lastRef)
}
case Match(length, optRef) => {
val kv = optRef.fold({
(0 until length).foreach(i => if (true /*qual(idx + i).toInt - 33 >= phredThreshold */ ) {
(0 until length).foreach(i => {
incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33)
println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33))
//println("test qual i:" + i + " idx:" + idx + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33))
})
(sequence(idx + length - 1).toString, variants)
sequence(idx + length - 1).toString
})((ref: String) => {
val newVars = (0 until length).flatMap(i => {
if (true /*qual(idx + i).toInt - 33 >= phredThreshold*/ ) {
println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33))

incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33)
Some(DiscoveredVariant(
contigName,
pos + i,
ref(i).toString,
sequence(idx + i).toString))
} else {
None
}
}).toList ::: variants
(ref.last.toString, newVars)
(0 until length).foreach(i => {
//println("test qual i:" + i + " idx:" + idx + " ref:" + ref(i).toString() + " alt: " + qual(i).toString + " fixed_alt: " + qual(i + idx) + " qual: " + (qual(idx + i).toInt - 33))
incrementBaseCount(sequence(idx + i), ReferencePosition(contigName, pos + i), qual(idx + i).toInt - 33)
})
ref.last.toString
})
pos += length
idx += length
kv
}
case Insertion(length) => {
val insQuals = qual.substring(idx - 1, idx + length).map(_.toInt - 33).sum / length
val newVar = if (insQuals >= phredThreshold) {
DiscoveredVariant(
contigName,
pos - 1,
lastRef,
sequence.substring(idx - 1, idx + length)) :: variants
} else {
variants
}
if (insQuals >= phredThreshold) { x(ReferencePosition(contigName, pos - 1)).ins += 1 }
idx += length
(lastRef, newVar)
(lastRef)
}
case Deletion(ref) => {
val delLength = ref.size
val newVar = if (qual(idx - 1).toInt - 33 >= phredThreshold) {
DiscoveredVariant(
contigName,
pos - 1,
lastRef + ref,
sequence.substring(idx - 1, idx)) :: variants
} else {
variants
}
if (qual(idx - 1).toInt - 33 >= phredThreshold) { x(ReferencePosition(contigName, pos)).del += 1 }
pos += delLength
(ref.last.toString, newVar)
(ref.last.toString)
}
}

// recurse
recordAlleles(iter, nextRef, nextVariants)
recordAlleles(iter, nextRef)
}
}

recordAlleles(opsIter)
}

}) //foreach read
})

//val z: Iterator[(ReferencePosition, AlleleCounts)] = x.toIterator
x.toIterator
}

alignmentRecordRDD.rdd.mapPartitions(posAlleleStatsMapPartitions)

.reduceByKey((x, y) => AlleleCounts(x.A + y.A,
x.C + y.C,
x.G + y.G,
x.T + y.T,
x.N + y.N,
x.ins + y.ins,
x.del + y.del))
}

}
Expand Down

This file was deleted.

0 comments on commit d6137af

Please sign in to comment.