Skip to content

Commit

Permalink
Upgrading to latest ADAM.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Feb 3, 2015
1 parent b84107f commit 80183e3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ private[debrujin] case class Kmer(kmerSeq: String,

def toDetailedString: String = {
kmerSeq + " * " + multiplicity + ", " + refPos.fold("unmapped")("@ " + _) + "\n" +
"qual: " + phred.map(_.toString).fold("")(_ + ", " + _) + "\n" +
"mapq: " + mapq.map(_.toString).fold("")(_ + ", " + _) + "\n" +
"readId: " + readId.map(_.toString).fold("")(_ + ", " + _) + "\n" +
"pre: " + predecessors.map(_.toString).fold("")(_ + ", " + _) + "\n" +
"post: " + successors.map(_.toString).fold("")(_ + ", " + _)
"qual: " + phred.mkString(", ") + "\n"
"mapq: " + mapq.mkString(", ") + "\n" +
"readId: " + readId.mkString(", ") + "\n" +
"pre: " + predecessors.mkString(", ") + "\n" +
"post: " + successors.mkString(", ")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.commons.configuration.{ HierarchicalConfiguration, SubnodeConf
import org.apache.spark.SparkContext._
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ ReferenceMapping, ReferenceRegion }
import org.bdgenomics.adam.rdd.RegionJoin
import org.bdgenomics.adam.models.{ ReferenceMapping, ReferenceRegion, SequenceDictionary }
import org.bdgenomics.adam.rdd.ShuffleRegionJoin
import org.bdgenomics.adam.rich.ReferenceMappingContext._
import org.bdgenomics.avocado.algorithms.debrujin.KmerGraph
import org.bdgenomics.avocado.models.{ Observation }
Expand All @@ -36,7 +36,7 @@ object ReassemblyExplorer extends ExplorerCompanion {
protected def apply(stats: AvocadoConfigAndStats,
config: SubnodeConfiguration): Explorer = {
val kmerLength = config.getInt("kmerLength", 20)
new ReassemblyExplorer(kmerLength, stats.reference)
new ReassemblyExplorer(kmerLength, stats.reference, stats.sequenceDict, stats.contigLengths)
}

implicit object ContigReferenceMapping extends ReferenceMapping[(Long, NucleotideContigFragment)] with Serializable {
Expand All @@ -48,7 +48,11 @@ object ReassemblyExplorer extends ExplorerCompanion {
import ReassemblyExplorer._

class ReassemblyExplorer(kmerLength: Int,
reference: RDD[NucleotideContigFragment]) extends Explorer with Logging {
reference: RDD[NucleotideContigFragment],
sd: SequenceDictionary,
contigLengths: Map[String, Long]) extends Explorer with Logging {

val totalAssembledReferenceLength = contigLengths.values.sum

val companion: ExplorerCompanion = ReassemblyExplorer

Expand Down Expand Up @@ -76,9 +80,11 @@ class ReassemblyExplorer(kmerLength: Int,
// filter mapped reads, join with reference contigs, then extract contig ids
// ultimately, this should use the merge-sort join, not the broadcast join
// will upgrade when ADAM-534 merges.
val joinWithId = RegionJoin.partitionAndJoin(reference.context,
val joinWithId = ShuffleRegionJoin.partitionAndJoin(reference.context,
refIds,
reads.filter(_.getReadMapped))
reads.filter(_.getReadMapped),
sd,
totalAssembledReferenceLength / reads.partitions.size)
.map(kv => {
(kv._1._1, kv._2)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.models.{ SAMFileHeaderWritable, VariantContext => ADAMVariantContext, ReferencePosition }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.GenomicRegionPartitioner
import org.bdgenomics.adam.rdd.GenomicPositionPartitioner
import org.bdgenomics.adam.rdd.read.AlignmentRecordContext._
import org.bdgenomics.avocado.models.{ Observation, ReadObservation }
import org.bdgenomics.avocado.stats.AvocadoConfigAndStats
Expand Down Expand Up @@ -199,7 +199,7 @@ class ExternalGenotyper(contigLengths: Map[String, Long],

// key reads by position and repartition
val readsByPosition = reads.keyBy(r => ReferencePosition(r.get.getReferenceName.toString, r.get.getAlignmentStart))
.partitionBy(new GenomicRegionPartitioner(numPart, contigLengths))
.partitionBy(GenomicPositionPartitioner(numPart, contigLengths))

if (debug) {
log.info("have " + readsByPosition.count + " reads after partitioning")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDDFunctions
import org.bdgenomics.adam.rich.RichGenotype._
import org.bdgenomics.adam.rich.{ RichAlignmentRecord, RichGenotype }
import org.bdgenomics.adam.util.SparkFunSuite
Expand Down Expand Up @@ -85,14 +86,17 @@ class ReassemblyExplorerSuite extends SparkFunSuite {
sc.loadAlignments(path)
}

lazy val sd = new AlignmentRecordRDDFunctions(na12878_chr20_snp_reads).adamGetSequenceDictionary()
lazy val cl = sd.records.map(r => (r.name, r.length)).toMap

sparkTest("reassemble and call variants on real data") {
val reads = na12878_chr20_snp_reads

val reference = getReferenceFromReads(reads.collect.toSeq)

val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference)))
val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference)), sd, cl)
val obs = re.discover(reads)
val bg = new BiallelicGenotyper()
val bg = new BiallelicGenotyper(sd, 2, false)
val vc = bg.genotype(obs)
.flatMap(_.genotypes)
.filter(g => g.getType != GenotypeType.HOM_REF)
Expand All @@ -110,9 +114,9 @@ class ReassemblyExplorerSuite extends SparkFunSuite {

val reference = getReferenceFromReads(reads.collect.toSeq)

val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference)))
val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference)), sd, cl)
val obs = re.discover(reads)
val bg = new BiallelicGenotyper()
val bg = new BiallelicGenotyper(sd, 2, false)
val vc = bg.genotype(obs)
.flatMap(_.genotypes)
.filter(g => g.getType != GenotypeType.HOM_REF)
Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<java.version>1.7</java.version>
<scala.version>2.10.3</scala.version>
<scala.version.prefix>2.10</scala.version.prefix>
<spark.version>1.1.0</spark.version>
<spark.version>1.2.0</spark.version>
<!-- Edit the following line to configure the Hadoop (HDFS) version. -->
<hadoop.version>2.2.0</hadoop.version>
</properties>
Expand Down Expand Up @@ -238,6 +238,10 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
Expand Down

0 comments on commit 80183e3

Please sign in to comment.