Skip to content

Commit

Permalink
[ADAM-1040] Remove implicit GenomicRDD to RDD conversion.
Browse files Browse the repository at this point in the history
Resolves #1040.
  • Loading branch information
fnothaft committed Jul 18, 2016
1 parent da2a142 commit 9c1437e
Show file tree
Hide file tree
Showing 25 changed files with 134 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg
val contigs = contigFragments.mergeFragments()

val cc = if (args.coalesce > 0) {
if (args.coalesce > contigs.partitions.size || args.forceShuffle) {
if (args.coalesce > contigs.rdd.partitions.size || args.forceShuffle) {
contigs.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
contigs.transform(_.coalesce(args.coalesce, shuffle = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variation.GenotypeRDD
import org.bdgenomics.formats.avro.{ Genotype, GenotypeAllele }
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
Expand Down Expand Up @@ -53,8 +54,8 @@ object AlleleCountHelper extends Serializable {
case _ => None
}

def countAlleles(adamVariants: RDD[Genotype], args: AlleleCountArgs) {
val usefulData = adamVariants.map(p => (
def countAlleles(adamVariants: GenotypeRDD, args: AlleleCountArgs) {
val usefulData = adamVariants.rdd.map(p => (
p.getVariant.getContigName,
p.getVariant.getStart,
p.getVariant.getReferenceAllele,
Expand All @@ -74,7 +75,7 @@ class AlleleCount(val args: AlleleCountArgs) extends BDGSparkCommand[AlleleCount

def run(sc: SparkContext) {

val adamVariants: RDD[Genotype] = sc.loadGenotypes(args.adamFile)
val adamVariants = sc.loadGenotypes(args.adamFile)
AlleleCountHelper.countAlleles(adamVariants, args)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PrintGenes(protected val args: PrintGenesArgs)
val companion = PrintGenes

def run(sc: SparkContext): Unit = {
val genes: RDD[Gene] = sc.loadGenes(args.gtfInput)
val genes: RDD[Gene] = sc.loadGenes(args.gtfInput).rdd

genes.map(printGene).collect().foreach(println)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
def apply(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {

var adamRecords = rdd
val sc = rdd.context
val sc = rdd.rdd.context
val sl = StorageLevel.fromString(args.storageLevel)

val stringencyOpt = Option(args.stringency).map(ValidationStringency.valueOf(_))
Expand Down Expand Up @@ -192,7 +192,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
if (args.coalesce > adamRecords.partitions.size || args.forceShuffle) {
if (args.coalesce > adamRecords.rdd.partitions.size || args.forceShuffle) {
adamRecords = adamRecords.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
adamRecords = adamRecords.transform(_.coalesce(args.coalesce, shuffle = false))
Expand Down Expand Up @@ -359,6 +359,6 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = CreateKnownSnpsTable.time {
Option(args.knownSnpsFile).fold(SnpTable())(f => SnpTable(sc.loadVariants(f).map(new RichVariant(_))))
Option(args.knownSnpsFile).fold(SnpTable())(f => SnpTable(sc.loadVariants(f).rdd.map(new RichVariant(_))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Vcf2ADAM(val args: Vcf2ADAMArgs) extends BDGSparkCommand[Vcf2ADAMArgs] wit

val variantContextRdd = sc.loadVcf(args.vcfPath, sdOpt = dictionary)
var variantContextsToSave = if (args.coalesce > 0) {
if (args.coalesce > variantContextRdd.partitions.size || args.forceShuffle) {
if (args.coalesce > variantContextRdd.rdd.partitions.size || args.forceShuffle) {
variantContextRdd.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
variantContextRdd.transform(_.coalesce(args.coalesce, shuffle = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variation.DatabaseVariantAnnotationRDD
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -52,14 +53,14 @@ class VcfAnnotation2ADAM(val args: VcfAnnotation2ADAMArgs) extends BDGSparkComma

def run(sc: SparkContext) {
log.info("Reading VCF file from %s".format(args.vcfFile))
val annotations: RDD[DatabaseVariantAnnotation] = sc.loadVcfAnnotations(args.vcfFile)
val annotations = sc.loadVcfAnnotations(args.vcfFile)

if (args.currentAnnotations != null) {
val existingAnnotations: RDD[DatabaseVariantAnnotation] = sc.loadVariantAnnotations(args.currentAnnotations)
val keyedAnnotations = existingAnnotations.keyBy(anno => new RichVariant(anno.getVariant))
val joinedAnnotations = keyedAnnotations.join(annotations.keyBy(anno => new RichVariant(anno.getVariant)))
val existingAnnotations = sc.loadVariantAnnotations(args.currentAnnotations)
val keyedAnnotations = existingAnnotations.rdd.keyBy(anno => new RichVariant(anno.getVariant))
val joinedAnnotations = keyedAnnotations.join(annotations.rdd.keyBy(anno => new RichVariant(anno.getVariant)))
val mergedAnnotations = joinedAnnotations.map(kv => VariantContext.mergeAnnotations(kv._2._1, kv._2._2))
mergedAnnotations.saveAsParquet(args)
DatabaseVariantAnnotationRDD(mergedAnnotations, existingAnnotations.sequences).saveAsParquet(args)
} else {
annotations.saveAsParquet(args)
}
Expand Down
2 changes: 1 addition & 1 deletion adam-cli/src/main/scala/org/bdgenomics/adam/cli/View.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class View(val args: ViewArgs) extends BDGSparkCommand[ViewArgs] {
reads.save(args)
} else {
if (args.printCount) {
println(reads.count())
println(reads.rdd.count())
} else {
println(reads.saveAsSamString())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class Fasta2ADAMSuite extends ADAMFunSuite {
val cmd = Fasta2ADAM(Array(inputPath, convertPath)).run(sc)

val contigFragments = sc.loadParquetContigFragments(convertPath)
assert(contigFragments.count() === 26)
val first = contigFragments.first()
assert(contigFragments.rdd.count() === 26)
val first = contigFragments.rdd.first()
assert(first.getContig.getContigName === "gi|224384749|gb|CM000682.1|")
assert(first.getDescription === "Homo sapiens chromosome 20, GRCh37 primary reference assembly")
assert(first.getFragmentNumber === 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Features2ADAMSuite extends ADAMFunSuite {

val schema = Projection(featureId, contigName, start, end, score)
val rdd = sc.loadFeatures(outputPath, projection = Some(schema))
val converted = rdd.collect.toSeq.sortBy(f => f.getStart)
val converted = rdd.rdd.collect.toSeq.sortBy(f => f.getStart)

assert(converted.size === 10)
assert(converted(0).getContigName == "chr5")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FlagStatSuite extends ADAMFunSuite {
AlignmentRecordField.mapq,
AlignmentRecordField.failedVendorQualityChecks)

val adamFile: RDD[AlignmentRecord] = sc.loadAlignments(args.inputPath, projection = Some(projection))
val adamFile: RDD[AlignmentRecord] = sc.loadAlignments(args.inputPath, projection = Some(projection)).rdd

val (failedVendorQuality, passedVendorQuality) = apply(adamFile)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[adam] class ConsensusGeneratorFromKnowns(file: String,
* @return Returns an option which wraps an RDD of indel realignment targets.
*/
def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = {
val rdd: RDD[Variant] = sc.loadVariants(file)
val rdd: RDD[Variant] = sc.loadVariants(file).rdd

Some(rdd.filter(v => v.getReferenceAllele.length != v.getAlternateAllele.length)
.map(v => ReferenceRegion(v.getContigName, v.getStart, v.getStart + v.getReferenceAllele.length))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[adam] object IndelTable {
* @return Returns a table with the known indels populated.
*/
def apply(knownIndelsFile: String, sc: SparkContext): IndelTable = {
val rdd: RDD[Variant] = sc.loadVariants(knownIndelsFile)
val rdd: RDD[Variant] = sc.loadVariants(knownIndelsFile).rdd
apply(rdd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ object ADAMContext {
implicit def iterableToJavaCollection[A](i: Iterable[A]): java.util.Collection[A] = asJavaCollection(i)

implicit def setToJavaSet[A](set: Set[A]): java.util.Set[A] = setAsJavaSet(set)

implicit def genomicRDDToRDD[T, U <: GenomicRDD[T, U]](gRdd: GenomicRDD[T, U]): RDD[T] = gRdd.rdd
}

import org.bdgenomics.adam.rdd.ADAMContext._
Expand Down Expand Up @@ -558,8 +556,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

stringency match {
case ValidationStringency.STRICT | ValidationStringency.LENIENT =>
val count1 = reads1.cache.count
val count2 = reads2.cache.count
val count1 = reads1.rdd.cache.count
val count2 = reads2.rdd.cache.count

if (count1 != count2) {
val msg = s"Fastq 1 ($filePath1) has $count1 reads, fastq 2 ($filePath2) has $count2 reads"
Expand All @@ -573,7 +571,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
case ValidationStringency.SILENT =>
}

UnalignedReadRDD.fromRdd(reads1 ++ reads2)
UnalignedReadRDD.fromRdd(reads1.rdd ++ reads2.rdd)
}

def loadUnpairedFastq(
Expand Down Expand Up @@ -865,7 +863,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
//TODO(ryan): S3ByteAccess
new TwoBitFile(new LocalFileByteAccess(new File(filePath)))
} else {
ReferenceContigMap(loadSequences(filePath, fragmentLength = fragmentLength))
ReferenceContigMap(loadSequences(filePath, fragmentLength = fragmentLength).rdd)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ConsensusGeneratorFromReadsSuite extends ADAMFunSuite {

def artificial_reads: RDD[AlignmentRecord] = {
val path = resourcePath("artificial.sam")
sc.loadAlignments(path)
sc.loadAlignments(path).rdd
}

sparkTest("checking search for consensus list for artificial reads") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class FastaConverterSuite extends ADAMFunSuite {

sparkTest("convert reference fasta file") {
//Loading "human_g1k_v37_chr1_59kb.fasta"
val referenceSequences = sc.loadSequences(chr1File, fragmentLength = 10).collect()
val referenceSequences = sc.loadSequences(chr1File, fragmentLength = 10).rdd.collect()
assert(referenceSequences.forall(_.getContig.getContigName.toString == "1"))
assert(referenceSequences.slice(0, referenceSequences.length - 2).forall(_.getFragmentSequence.length == 10))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class GeneSuite extends ADAMFunSuite {

val fixedParentIds = features.reassignParentIds

val genes = fixedParentIds.toGenes()
val genes = fixedParentIds.toGenes().rdd
assert(genes.count() === 4)

val transcripts = genes.flatMap(_.transcripts)
Expand All @@ -51,7 +51,7 @@ class GeneSuite extends ADAMFunSuite {

val fixedParentIds = features.reassignParentIds

val genes = fixedParentIds.toGenes()
val genes = fixedParentIds.toGenes().rdd
assert(genes.count() === 8)

val transcripts = genes.flatMap(_.transcripts)
Expand Down Expand Up @@ -102,7 +102,7 @@ class GeneSuite extends ADAMFunSuite {
val features = sc.loadFeatures(path)
val fixedParentIds = features.reassignParentIds
val genes = fixedParentIds.toGenes()
val transcripts: Seq[Transcript] = genes.flatMap(g => g.transcripts).take(100)
val transcripts: Seq[Transcript] = genes.rdd.flatMap(g => g.transcripts).take(100)

transcripts.foreach { transcript =>
val mySequence = transcript.extractSplicedmRNASequence(chr20sequence.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class FieldEnumerationSuite extends ADAMFunSuite {
rRdd.saveAsParquet(TestSaveArgs(readsParquetFilepath))

val p1 = Projection(AlignmentRecordField.readName)
val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p1))
val reads1: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p1)).rdd
assert(reads1.count() === 200)

val first1 = reads1.first()
assert(first1.getReadName.toString === "simread:1:26472783:false")
assert(first1.getReadMapped === false)

val p2 = Projection(AlignmentRecordField.readName, AlignmentRecordField.readMapped)
val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p2))
val reads2: RDD[AlignmentRecord] = sc.loadAlignments(readsParquetFilepath, projection = Some(p2)).rdd
assert(reads2.count() === 200)

val first2 = reads2.first()
Expand Down
Loading

0 comments on commit 9c1437e

Please sign in to comment.