Skip to content

Commit

Permalink
modified CoverageRDD and FeatureRDD to extend MultisampleGenomicDataset
Browse files Browse the repository at this point in the history
  • Loading branch information
akmorrow13 committed Nov 13, 2018
1 parent de21474 commit 5a57421
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.bdgenomics.adam.cli

import java.io._
import org.bdgenomics.adam.projections.Projection
import org.bdgenomics.adam.projections.FeatureField._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli.Args4j
import org.bdgenomics.formats.avro.Feature

class TransformFeaturesSuite extends ADAMFunSuite {

Expand All @@ -44,9 +42,7 @@ class TransformFeaturesSuite extends ADAMFunSuite {
val features2Adam = new TransformFeatures(args)
features2Adam.run(sc)

val schema = Projection(featureId, contigName, start, strand)
val lister = new ParquetLister[Feature](Some(schema))
val converted = lister.materialize(outputPath).toSeq
val converted = sc.loadFeatures(outputPath).rdd.collect

assert(converted.size === 10)
assert(converted.find(_.getContigName != "chr1").isEmpty)
Expand Down
23 changes: 16 additions & 7 deletions adam-core/src/main/scala/org/bdgenomics/adam/models/Coverage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ private[adam] object Coverage {
*
* @param region ReferenceRegion in which Coverage spans
* @param count Coverage count for each base pair in region
* @param optSampleId Option of sampleId for this Coverage record
* @return Coverage spanning the specified ReferenceRegion
*/
def apply(region: ReferenceRegion, count: Double): Coverage = {
Coverage(region.referenceName, region.start, region.end, count)
def apply(region: ReferenceRegion, count: Double, optSampleId: Option[String]): Coverage = {
Coverage(region.referenceName, region.start, region.end, count, optSampleId)
}

/**
Expand All @@ -54,7 +55,8 @@ private[adam] object Coverage {
Coverage(feature.getContigName,
feature.getStart,
feature.getEnd,
feature.getScore)
feature.getScore,
Option(feature.getSampleId))
}

/**
Expand All @@ -79,29 +81,36 @@ private[adam] object Coverage {
* observed.
* @param end The end coordinate of the region where this coverage value was
* observed.
* @param optSampleId Option of sampleId for this Coverage record
* @param count The average coverage across this region.
*/
case class Coverage(contigName: String, start: Long, end: Long, count: Double) {
case class Coverage(contigName: String, start: Long, end: Long, count: Double, optSampleId: Option[String] = None) {

/**
* Converts Coverage to Feature, setting Coverage count in the score attribute.
*
* @return Feature built from Coverage
*/
def toFeature: Feature = {
Feature.newBuilder()
val featureBuilder = Feature.newBuilder()
.setContigName(contigName)
.setStart(start)
.setEnd(end)
.setScore(count)
.build()
}

// set name, if applicable
if (optSampleId.isDefined) {
featureBuilder.setSampleId(optSampleId.get)
}

featureBuilder.build()
}
/**
* Converts Coverage to a Feature case class, for use with Spark SQL.
*/
def toSqlFeature: FeatureProduct = {
new FeatureProduct(featureId = None,
sampleId = optSampleId,
name = None,
source = None,
featureType = None,
Expand Down
78 changes: 41 additions & 37 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,25 @@ object ADAMContext {
implicit def contigsToCoverageConversionFn(
gRdd: NucleotideContigFragmentRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def contigsToCoverageDatasetConversionFn(
gRdd: NucleotideContigFragmentRDD,
ds: Dataset[Coverage]): CoverageRDD = {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
new DatasetBoundCoverageRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def contigsToFeaturesConversionFn(
gRdd: NucleotideContigFragmentRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def contigsToFeaturesDatasetConversionFn(
gRdd: NucleotideContigFragmentRDD,
ds: Dataset[FeatureProduct]): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
new DatasetBoundFeatureRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def contigsToFragmentsConversionFn(
Expand Down Expand Up @@ -279,13 +279,13 @@ object ADAMContext {
implicit def coverageToFeaturesConversionFn(
gRdd: CoverageRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def coverageToFeaturesDatasetConversionFn(
gRdd: CoverageRDD,
ds: Dataset[FeatureProduct]): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
new DatasetBoundFeatureRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def coverageToFragmentsConversionFn(
Expand Down Expand Up @@ -386,13 +386,13 @@ object ADAMContext {
implicit def featuresToCoverageConversionFn(
gRdd: FeatureRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def featuresToCoverageDatasetConversionFn(
gRdd: FeatureRDD,
ds: Dataset[Coverage]): CoverageRDD = {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
new DatasetBoundCoverageRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def featuresToFeaturesConversionFn(gRdd: FeatureRDD,
Expand Down Expand Up @@ -499,25 +499,25 @@ object ADAMContext {
implicit def fragmentsToCoverageConversionFn(
gRdd: FragmentRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def fragmentsToCoverageDatasetConversionFn(
gRdd: FragmentRDD,
ds: Dataset[Coverage]): CoverageRDD = {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
new DatasetBoundCoverageRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def fragmentsToFeaturesConversionFn(
gRdd: FragmentRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def fragmentsToFeaturesDatasetConversionFn(
gRdd: FragmentRDD,
ds: Dataset[FeatureProduct]): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
new DatasetBoundFeatureRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def fragmentsToFragmentsConversionFn(gRdd: FragmentRDD,
Expand Down Expand Up @@ -599,13 +599,13 @@ object ADAMContext {
implicit def genericToCoverageConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def genericToFeatureConversionFn[Y <: GenericGenomicDataset[_, _]](
gRdd: Y,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def genericToFragmentsConversionFn[Y <: GenericGenomicDataset[_, _]](
Expand Down Expand Up @@ -672,25 +672,25 @@ object ADAMContext {
implicit def alignmentRecordsToCoverageConversionFn(
gRdd: AlignmentRecordRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def alignmentRecordsToCoverageDatasetConversionFn(
gRdd: AlignmentRecordRDD,
ds: Dataset[Coverage]): CoverageRDD = {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
new DatasetBoundCoverageRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def alignmentRecordsToFeaturesConversionFn(
gRdd: AlignmentRecordRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def alignmentRecordsToFeaturesDatasetConversionFn(
gRdd: AlignmentRecordRDD,
ds: Dataset[FeatureProduct]): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
new DatasetBoundFeatureRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def alignmentRecordsToFragmentsConversionFn(
Expand Down Expand Up @@ -778,25 +778,25 @@ object ADAMContext {
implicit def genotypesToCoverageConversionFn(
gRdd: GenotypeRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def genotypesToCoverageDatasetConversionFn(
gRdd: GenotypeRDD,
ds: Dataset[Coverage]): CoverageRDD = {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
new DatasetBoundCoverageRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def genotypesToFeaturesConversionFn(
gRdd: GenotypeRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def genotypesToFeaturesDatasetConversionFn(
gRdd: GenotypeRDD,
ds: Dataset[FeatureProduct]): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
new DatasetBoundFeatureRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def genotypesToFragmentsConversionFn(
Expand Down Expand Up @@ -884,25 +884,25 @@ object ADAMContext {
implicit def variantsToCoverageConversionFn(
gRdd: VariantRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def variantsToCoverageDatasetConversionFn(
gRdd: VariantRDD,
ds: Dataset[Coverage]): CoverageRDD = {
new DatasetBoundCoverageRDD(ds, gRdd.sequences)
new DatasetBoundCoverageRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def variantsToFeaturesConversionFn(
gRdd: VariantRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def variantsToFeaturesDatasetConversionFn(
gRdd: VariantRDD,
ds: Dataset[FeatureProduct]): FeatureRDD = {
new DatasetBoundFeatureRDD(ds, gRdd.sequences)
new DatasetBoundFeatureRDD(ds, gRdd.sequences, Seq.empty[Sample])
}

implicit def variantsToFragmentsConversionFn(
Expand Down Expand Up @@ -986,13 +986,13 @@ object ADAMContext {
implicit def variantContextsToCoverageConversionFn(
gRdd: VariantContextRDD,
rdd: RDD[Coverage]): CoverageRDD = {
new RDDBoundCoverageRDD(rdd, gRdd.sequences, None)
new RDDBoundCoverageRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def variantContextsToFeaturesConversionFn(
gRdd: VariantContextRDD,
rdd: RDD[Feature]): FeatureRDD = {
new RDDBoundFeatureRDD(rdd, gRdd.sequences, None)
new RDDBoundFeatureRDD(rdd, gRdd.sequences, Seq.empty[Sample], None)
}

implicit def variantContextsToFragmentsConversionFn(
Expand Down Expand Up @@ -2665,13 +2665,15 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (optPredicate.isEmpty && !forceRdd) {
// convert avro to sequence dictionary
val sd = loadAvroSequenceDictionary(pathName)
val samples = loadAvroSamples(pathName)

new ParquetUnboundCoverageRDD(sc, pathName, sd)
new ParquetUnboundCoverageRDD(sc, pathName, sd, samples)
} else {
val coverageFields = Projection(FeatureField.contigName,
FeatureField.start,
FeatureField.end,
FeatureField.score)
FeatureField.score,
FeatureField.sampleId)
loadParquetFeatures(pathName,
optPredicate = optPredicate,
optProjection = Some(coverageFields))
Expand Down Expand Up @@ -2702,7 +2704,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _, Seq.empty))
}

/**
Expand All @@ -2728,7 +2730,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _, Seq.empty))
}

/**
Expand All @@ -2754,7 +2756,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _, Seq.empty))
}

/**
Expand All @@ -2780,7 +2782,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _, Seq.empty))
}

/**
Expand All @@ -2805,7 +2807,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
parsedLines.flatMap(_._2))

if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, seqDict)
FeatureRDD(records, seqDict, Seq.empty)
}

/**
Expand All @@ -2825,16 +2827,17 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
optProjection: Option[Schema] = None): FeatureRDD = {

val sd = loadAvroSequenceDictionary(pathName)
val samples = loadAvroSamples(pathName)

(optPredicate, optProjection) match {
case (None, None) => {
ParquetUnboundFeatureRDD(sc, pathName, sd)
ParquetUnboundFeatureRDD(sc, pathName, sd, samples)
}
case (_, _) => {
// load from disk
val rdd = loadParquet[Feature](pathName, optPredicate, optProjection)

new RDDBoundFeatureRDD(rdd, sd, optPartitionMap = extractPartitionMap(pathName))
new RDDBoundFeatureRDD(rdd, sd, samples, optPartitionMap = extractPartitionMap(pathName))
}
}
}
Expand All @@ -2858,6 +2861,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val features = loadParquetFeatures(pathName)
val featureDatasetBound = DatasetBoundFeatureRDD(features.dataset,
features.sequences,
features.samples,
isPartitioned = true,
Some(partitionedBinSize),
optLookbackPartitions
Expand Down
Loading

0 comments on commit 5a57421

Please sign in to comment.