Skip to content

Commit

Permalink
Modified CalculateDepth to calcuate coverage from alignment files
Browse files Browse the repository at this point in the history
  • Loading branch information
akmorrow13 committed Aug 9, 2016
1 parent e7e1adf commit 1624df6
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.projections.AlignmentRecordField._
import org.bdgenomics.adam.projections.Projection
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.features.CoverageRDD
import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

/**
* Reads2Coverage (accessible as the command 'coverage' through the CLI) takes two arguments,
* an Read file and an output file, and calculates the number of reads from the Read file
* at every location in the file. Optional arguments are negativeStrands, and positiveStrands,
* which only save coverage computed from negative and positive strands, respectively.
*/
object Reads2Coverage extends BDGCommandCompanion {
val commandName: String = "coverage"
val commandDescription: String = "Calculate the depth from a given ADAM file"

def apply(cmdLine: Array[String]): BDGCommand = {
new Reads2Coverage(Args4j[Reads2CoverageArgs](cmdLine))
}
}

class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The reads file to use to calculate depths", index = 0)
var inputPath: String = null
@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the coverage data in ADAM/Parquet format", index = 1)
var outputPath: String = null
@Args4jOption(required = false, name = "-negativeStrands", usage = "Compute coverage for negative strands")
var negativeStrands: Boolean = false
@Args4jOption(required = false, name = "-positiveStrands", usage = "Compute coverage for positive strands")
var positiveStrands: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
val companion: BDGCommandCompanion = CalculateDepth

def run(sc: SparkContext): Unit = {

val proj = Projection(contigName, start, end, cigar)

// load reads
val readsRDD: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, projection = Some(proj))

// if strand direction is not specified, save unified coverage
if (!args.negativeStrands && !args.positiveStrands) {

// save final features
val featureRDD: CoverageRDD = readsRDD.toCoverage
featureRDD.save(args.outputPath)

} else {

// if negative strand, save coverage of only negative strands
if (args.negativeStrands) {
// count sites for all strands
val negativeReadsRDD: AlignmentRecordRDD = AlignedReadRDD(readsRDD.rdd.filter(_.getReadNegativeStrand), readsRDD.sequences, readsRDD.recordGroups)

// save final features
val coverageRDD: CoverageRDD = negativeReadsRDD.toCoverage
coverageRDD.save(s"negative_${args.outputPath}")
}

// if positive strand, save coverage of only positive strands
if (args.positiveStrands) {
// count sites for all strands
val positiveReadsRDD: AlignmentRecordRDD = AlignedReadRDD(readsRDD.rdd.filter(!_.getReadNegativeStrand), readsRDD.sequences, readsRDD.recordGroups)

// save final features
val coverageRDD: CoverageRDD = positiveReadsRDD.toCoverage
coverageRDD.save(s"negative_${args.outputPath}")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.utils.cli.Args4j

class Reads2CoverageSuite extends ADAMFunSuite {

sparkTest("correctly calculates coverage from small sam file") {
val inputPath = copyResource("artificial.sam")
val outputPath = tmpFile("coverage.adam")

val args: Array[String] = Array(inputPath, outputPath)
new Reads2Coverage(Args4j[Reads2CoverageArgs](args)).run(sc)
val coverage = sc.loadCoverage(outputPath)

val pointCoverage = coverage.rdd.filter(_.position == 30).first
assert(pointCoverage.count == 5)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.projections.{ AlignmentRecordField, NucleotideContigFragmentField, Projection }
import org.bdgenomics.adam.projections.{ FeatureField, AlignmentRecordField, NucleotideContigFragmentField, Projection }
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.adam.rdd.features._
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
Expand Down Expand Up @@ -757,6 +757,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
FragmentRDD.fromRdd(records.map(fastqRecordConverter.convertFragment))
}

def loadCoverage(filePath: String, minPartitions: Option[Int] = None): CoverageRDD = {
val proj = Projection(FeatureField.contigName, FeatureField.start, FeatureField.end, FeatureField.score)
CoverageRDD(loadParquetFeatures(filePath, projection = Some(proj)))
}

def loadGff3(filePath: String, minPartitions: Option[Int] = None): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)).flatMap(new GFF3Parser().parse)
if (Metrics.isRecording) records.instrument() else records
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.features

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion

/**
* RDD holding run length encoded coverage in a FeatureRDD. CoverageRDD stores
* coverage counts at a ReferenceRegion in the score attribute of
* a feature.
*
* @param featureRDD FeatureRDD holding records for coverage and corresponding
* referenceRegion
*/
case class CoverageRDD(featureRDD: FeatureRDD) {

/**
* Java friendly save function. Automatically detects the output format.
*
* If the filename ends in ".bed", we write a BED file. If the file name ends
* in ".narrow[pP]eak", we save in the NarrowPeak format. If the file name
* ends in ".interval_list", we save in the interval list format. Else, we
* save as Parquet. These files are written as sharded text files.
*
* @param filePath The location to write the output.
*/
def save(filePath: java.lang.String) = {
featureRDD.save(filePath)
}

/**
* Gets coverage overlapping specified ReferenceRegion. For large, ReferenceRegions,
* base pairs per bin (bpPerBin) can be specified to bin together ReferenceRegions of
* equal size.
*
* @param region ReferenceRegion to fetch overlapping coverage from
* @param bpPerBin base pairs per bin, number of bases to combine to one bin
* @return RDD of Coverage Records
*/
def getCoverage(region: ReferenceRegion, bpPerBin: Int = 1): RDD[Coverage] = {
val coverage = featureRDD.filterByOverlappingRegion(region)

val flattenedCoverage = coverage.rdd.flatMap(r => {
val positions: List[Long] = List.range(r.getStart, r.getEnd)
positions.map(n => Coverage(r.getContigName, n, r.getScore))
}).filter(r => r.position >= region.start && r.position < region.end) // filter out positions from flanking regions

flattenedCoverage.filter(r => r.position % bpPerBin == 0)
}

/**
* Gets raw RDD of coverage
* @return RDD[Coverage] from underlying FeatureRDD
*/
def rdd: RDD[Coverage] = {
featureRDD.rdd.flatMap(r => {
val positions: List[Long] = List.range(r.getStart, r.getEnd)
positions.map(n => Coverage(r.getContigName, n, r.getScore))
})
}
}

/**
* Coverage Recordfor CoverageRDD
* @param referenceName Specifies chromosomal location of coverage
* @param position Specifies position of coverage
* @param count Specifies count of coverage at location
*/
case class Coverage(referenceName: String, position: Long, count: Double)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.features.{ CoverageRDD, FeatureRDD }
import org.bdgenomics.adam.rdd.{
AvroReadGroupGenomicRDD,
ADAMSaveAnyArgs,
Expand Down Expand Up @@ -84,6 +85,35 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
recordGroups)
}

/**
* Convert this set of reads into Coverage records
*
* @return Returns a CoverageRDD where all reads have been grouped and
* counted at all overlapping positions
*/
def toCoverage: CoverageRDD = {
val coverage: RDD[(ReferenceRegion, Int)] =
rdd.rdd
.flatMap(r => {
val t: List[Long] = List.range(r.getStart, r.getEnd)
val m: List[(ReferenceRegion, Int)] = t.map(n => (ReferenceRegion(r.getContigName, n, n + 1), 1))
m
}).reduceByKey(_ + _).sortByKey()

// TODO: merge adjacent coverage
val features: RDD[Feature] = coverage.map(r => {
val fb = Feature.newBuilder()
fb.setContigName(r._1.referenceName)
fb.setStart(r._1.start)
fb.setEnd(r._1.end)
fb.setScore(r._2.toDouble)
fb.build()
})

CoverageRDD(FeatureRDD(features,
sequences))
}

/**
* Returns all reference regions that overlap this read.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.bdgenomics.adam.algorithms.consensus.Consensus
import org.bdgenomics.adam.converters.FastaConverter.FastaDescriptionLine
import org.bdgenomics.adam.converters.FragmentCollector
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.features.Coverage
import org.bdgenomics.adam.rdd.read.realignment._
import org.bdgenomics.adam.rdd.read.recalibration.{ CovariateKey, CovariateSpace, CycleCovariate, DinucCovariate, Observation, ObservationAccumulator }
import org.bdgenomics.adam.rdd.read.{ DuplicateMetrics, FlagStatMetrics }
Expand Down Expand Up @@ -108,6 +109,9 @@ class ADAMKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Variant], new AvroSerializer[Variant])
kryo.register(classOf[Array[Variant]])

kryo.register(classOf[Coverage])
kryo.register(classOf[Array[Coverage]])

kryo.register(classOf[DatabaseVariantAnnotation], new AvroSerializer[DatabaseVariantAnnotation])
kryo.register(classOf[Array[DatabaseVariantAnnotation]])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.features

import com.google.common.collect.ImmutableMap
import java.io.File
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceRecord, SequenceDictionary }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro.{ Feature, Strand }
import org.scalactic.{ Equivalence, TypeCheckedTripleEquals }

class CoverageRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {

def tempLocation(suffix: String = ".adam"): String = {
val tempFile = File.createTempFile("FeatureRDDFunctionsSuite", "")
val tempDir = tempFile.getParentFile
new File(tempDir, tempFile.getName + suffix).getAbsolutePath
}

val sd = new SequenceDictionary(Vector(SequenceRecord("chr1", 2000L),
SequenceRecord("chr2", 20000L)))

sparkTest("correctly flatmaps coverage") {
val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(10).setScore(3.0).build()
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), sd)
val coverageRDD: CoverageRDD = CoverageRDD(featureRDD)

val region = ReferenceRegion("chr1", 5, 17)
val coverage = coverageRDD.getCoverage(region)
assert(coverage.count == 7)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import java.nio.file.Files
import htsjdk.samtools.ValidationStringency
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{
ReferenceRegion,
RecordGroupDictionary,
SequenceDictionary,
SequenceRecord
}
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.TestSaveArgs
import org.bdgenomics.adam.rdd.features.{ CoverageRDD, FeatureRDD }
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro._
import scala.util.Random
Expand Down Expand Up @@ -79,6 +81,16 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite {
assert(expectedSortedReads === mapped)
}

sparkTest("computes coverage") {
val inputPath = resourcePath("artificial.sam")
val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath)

// get pileup at position 30
val pointCoverage = reads.filterByOverlappingRegion(ReferenceRegion("artificial", 30, 31)).rdd.count
val coverage: CoverageRDD = reads.toCoverage
assert(coverage.rdd.filter(r => r.position == 30).first.count == pointCoverage)
}

sparkTest("sorting reads by reference index") {
val random = new Random("sortingIndices".hashCode)
val numReadsToCreate = 1000
Expand Down

0 comments on commit 1624df6

Please sign in to comment.