Skip to content

Commit

Permalink
refactored ReferenceFile to require SequenceDictionary
Browse files Browse the repository at this point in the history
Added support for coverage calculation from alignment files

[ADAM-1122] Remove Coverage and PairingRDD. Resolves #1122.
  • Loading branch information
akmorrow13 committed Aug 23, 2016
1 parent 2544a3d commit 6d54630
Show file tree
Hide file tree
Showing 22 changed files with 759 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ object ADAMMain {
Features2ADAM,
WigFix2Bed,
Fragments2Reads,
Reads2Fragments
Reads2Fragments,
Reads2Coverage
)
),
CommandGroup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.projections.AlignmentRecordField._
import org.bdgenomics.adam.projections.Projection
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.features.CoverageRDD
import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

/**
* Reads2Coverage (accessible as the command 'reads2coverage' through the CLI) takes two arguments,
* an INPUT and OUTPUT, and calculates the number of reads from INPUT at every location in
* the file. Optional arguments are only_negative_strands, only_positive_strands and collapse.
* only_negative_strands and only_positive_strands save coverage computed from only negative and positive strands,
* respectively. Collapse specifies whether saved coverage should merge neighboring coverage with the same counts
* to one record.
*/
object Reads2Coverage extends BDGCommandCompanion {
val commandName: String = "reads2coverage"
val commandDescription: String = "Calculate the coverage from a given ADAM file"

def apply(cmdLine: Array[String]): BDGCommand = {
new Reads2Coverage(Args4j[Reads2CoverageArgs](cmdLine))
}
}

class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The reads file to use to calculate depths", index = 0)
var inputPath: String = null
@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the coverage data in ADAM/Parquet format", index = 1)
var outputPath: String = null
@Args4jOption(required = false, name = "-collapse", usage = "Collapses neighboring coverage records " +
"of equal counts into the same record")
var collapse: Boolean = true
@Args4jOption(required = false, name = "-only_negative_strands", usage = "Compute coverage for negative strands")
var onlyNegativeStrands: Boolean = false
@Args4jOption(required = false, name = "-only_positive_strands", usage = "Compute coverage for positive strands")
var onlyPositiveStrands: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
val companion: BDGCommandCompanion = Reads2Coverage

def run(sc: SparkContext): Unit = {

val proj = Projection(contigName, start, end, cigar)

// If saving strand specific coverage, require that only one direction is specified
require(!(args.onlyNegativeStrands && args.onlyPositiveStrands),
"Cannot compute coverage for both negative and positive strands separately")

// load reads
val readsRdd: AlignmentRecordRDD = sc.loadAlignments(args.inputPath, projection = Some(proj))

val finalReads = if (args.onlyNegativeStrands && !args.onlyPositiveStrands) {
readsRdd.transform(rdd => rdd.filter(_.getReadNegativeStrand))
} else if (!args.onlyNegativeStrands && args.onlyPositiveStrands) {
readsRdd.transform(rdd => rdd.filter(!_.getReadNegativeStrand))
} else {
readsRdd
}

finalReads.toCoverage(args.collapse)
.save(args.outputPath)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.utils.cli.Args4j

class Reads2CoverageSuite extends ADAMFunSuite {

sparkTest("correctly calculates coverage from small sam file") {
val inputPath = copyResource("artificial.sam")
val outputPath = tmpFile("coverage.adam")

val args: Array[String] = Array(inputPath, outputPath)
new Reads2Coverage(Args4j[Reads2CoverageArgs](args)).run(sc)
val coverage = sc.loadCoverage(outputPath)

val pointCoverage = coverage.flatten.rdd.filter(_.start == 30).first
assert(pointCoverage.count == 5)
}
}

86 changes: 86 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.models

import org.apache.spark.rdd.RDD
import org.bdgenomics.formats.avro.Feature

/**
* Converts from avro Feature to Coverage.
*/
object Coverage {

/**
* Creates Coverage from ReferenceRegion and coverage count in that ReferenceRegion.
*
* @param region ReferenceRegion in which Coverage spans
* @param count Coverage count for each base pair in region
* @return Coverage spanning the specified ReferenceRegion
*/
private[adam] def apply(region: ReferenceRegion, count: Double): Coverage = {
Coverage(region.referenceName, region.start, region.end, count)
}

/**
* Creates Coverage from Feature, extracting region information and feature score for coverage.
*
* @param feature Feature to create coverage from
* @return Coverage spanning the specified feature
*/
private[adam] def apply(feature: Feature): Coverage = {
Coverage(feature.getContigName, feature.getStart, feature.getEnd, feature.getScore)
}

/**
* Creates an RDD of Coverage from RDD of Features.
*
* @param rdd RDD of Features to extract Coverage from
* @return RDD of Coverage spanning all features in rdd
*/
private[adam] def apply(rdd: RDD[Feature]): RDD[Coverage] = {
rdd.map(f => Coverage(f))
}
}

/**
* Coverage record for CoverageRDD.
* Contains Region indexed by contig name, start and end, as well as count of coverage at
* each base pair in that region.
*
* @param contigName Specifies chromosomal location of coverage
* @param start Specifies start position of coverage
* @param end Specifies end position of coverage
* @param count Specifies count of coverage at location
*/
case class Coverage(contigName: String, start: Long, end: Long, count: Double) {

/**
* Converts Coverage to Feature, setting Coverage count in the score attribute.
*
* @return Feature built from Coverage
*/
def toFeature: Feature = {
val fb = Feature.newBuilder()
fb.setContigName(contigName)
fb.setStart(start)
fb.setEnd(end)
fb.setScore(count)
fb.build()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,25 @@ object ReferenceRegion {
}
}

/**
* Extracts ReferenceRegion from Feature
*
* @param feature Feature to extract ReferenceRegion from
* @return Extracted ReferenceRegion
*/
def apply(feature: Feature): ReferenceRegion = {
new ReferenceRegion(feature.getContigName, feature.getStart, feature.getEnd)
}

/**
* Extracts ReferenceRegion from Coverage
*
* @param coverage Coverage to extract ReferenceRegion from
* @return Extracted ReferenceRegion
*/
def apply(coverage: Coverage): ReferenceRegion = {
new ReferenceRegion(coverage.contigName, coverage.start, coverage.end)
}
}

/**
Expand Down
38 changes: 25 additions & 13 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*/
package org.bdgenomics.adam.rdd

import collection.JavaConverters._
import java.io.{ File, FileNotFoundException, InputStream }
import java.util.regex.Pattern

import com.google.common.collect.ImmutableList
import htsjdk.samtools.{ SAMFileHeader, ValidationStringency }
import htsjdk.samtools.util.{ Interval, Locatable }
import htsjdk.samtools.{ SAMFileHeader, ValidationStringency }
import htsjdk.variant.vcf.VCFHeader
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
Expand All @@ -35,33 +35,35 @@ import org.apache.parquet.avro.{ AvroParquetInputFormat, AvroReadSupport }
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.projections.{ AlignmentRecordField, NucleotideContigFragmentField, Projection }
import org.bdgenomics.adam.projections.{
AlignmentRecordField,
FeatureField,
NucleotideContigFragmentField,
Projection
}
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.adam.rdd.features._
import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.adam.rdd.read.{
AlignedReadRDD,
AlignmentRecordRDD,
UnalignedReadRDD
}
import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD, UnalignedReadRDD }
import org.bdgenomics.adam.rdd.variation._
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.util.{ TwoBitFile, ReferenceContigMap, ReferenceFile }
import org.bdgenomics.adam.util.{ ReferenceContigMap, ReferenceFile, TwoBitFile }
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.instrumentation.Metrics
import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.utils.misc.HadoopUtil
import org.bdgenomics.utils.misc.Logging
import org.bdgenomics.utils.misc.{ HadoopUtil, Logging }
import org.seqdoop.hadoop_bam._
import org.seqdoop.hadoop_bam.util._

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.reflect.ClassTag

Expand Down Expand Up @@ -757,6 +759,17 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
FragmentRDD.fromRdd(records.map(fastqRecordConverter.convertFragment))
}

/**
* Loads Parquet file of Features to a CoverageRDD.
* Coverage is stored in the score attribute of Feature.
* @param filePath File path to load coverage from
* @return CoverageRDD containing an RDD of Coverage
*/
def loadCoverage(filePath: String): CoverageRDD = {
val proj = Projection(FeatureField.contigName, FeatureField.start, FeatureField.end, FeatureField.score)
loadFeatures(filePath, projection = Some(proj)).toCoverage
}

def loadGff3(filePath: String, minPartitions: Option[Int] = None): FeatureRDD = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)).flatMap(new GFF3Parser().parse)
if (Metrics.isRecording) records.instrument() else records
Expand Down Expand Up @@ -999,7 +1012,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
} else if (filePath.endsWith(".fa") ||
filePath.endsWith(".fasta")) {
log.info(s"Loading $filePath as FASTA and converting to AlignmentRecords. Projection is ignored.")
import ADAMContext._
UnalignedReadRDD(loadFasta(filePath, fragmentLength = 10000).toReads,
RecordGroupDictionary.empty)
} else if (filePath.endsWith("contig.adam")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.bdgenomics.adam.models.{
SequenceDictionary
}
import org.bdgenomics.adam.rdd.{ AvroGenomicRDD, JavaSaveArgs }
import org.bdgenomics.adam.util.ReferenceFile
import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment }
import scala.collection.JavaConversions._
import scala.math.max
Expand All @@ -53,9 +54,18 @@ object NucleotideContigFragmentRDD extends Serializable {
}
}

/**
* A wrapper class for RDD[NucleotideContigFragment].
* NucleotideContigFragmentRDD extends ReferenceFile. To specifically access a ReferenceFile within an RDD,
* refer to:
* @see ReferenceContigMap
*
* @param rdd Underlying RDD
* @param sequences Sequence dictionary computed from rdd
*/
case class NucleotideContigFragmentRDD(
rdd: RDD[NucleotideContigFragment],
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] {
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] with ReferenceFile {

/**
* Converts an RDD of nucleotide contig fragments into reads. Adjacent contig fragments are
Expand Down Expand Up @@ -169,7 +179,7 @@ case class NucleotideContigFragmentRDD(
* @param region Reference region over which to get sequence.
* @return String of bases corresponding to reference sequence.
*/
def getReferenceString(region: ReferenceRegion): String = {
def extract(region: ReferenceRegion): String = {
def getString(fragment: (ReferenceRegion, NucleotideContigFragment)): (ReferenceRegion, String) = {
val trimStart = max(0, region.start - fragment._1.start).toInt
val trimEnd = max(0, fragment._1.end - region.end).toInt
Expand Down
Loading

0 comments on commit 6d54630

Please sign in to comment.