From 137ac724a96d0f04a33964e024659b08127f829d Mon Sep 17 00:00:00 2001 From: Paschall Date: Fri, 19 Jan 2018 10:25:14 -0500 Subject: [PATCH 1/5] hive-style partitioning of parquet files by genomic position removed addChrPrefix parameter Address PR comments - part 1 Address PR comments - part 2 fix nits Rebased Address review comments - part 3 address reviewer comments - white space and redundant asserts fixed isPartitioned fixed codacy issue with return in isPartitioned made writePartitionedParquetFlag private updated overlap calc in refereceRegionsToDatasetQueryStirng try2 add filter in genomicRDD move filterOverlapping to alignment record another try trying as filterDatasetByOverlappingRegions move filter mapq out of AlignmentRecord clean up, ready to retest with Mango feb 12th morning added GenotypeRDD filterByOverlappingRegion support dataset region filter in all types updated ADAMContext to use filterByOverlap and added docs fix doc indent by mvn removed public referenceRegionQueryStrin function form ADAMContext --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 153 ++++++++++++++++++ .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 68 ++++++++ .../contig/NucleotideContigFragmentRDD.scala | 17 ++ .../adam/rdd/feature/FeatureRDD.scala | 14 ++ .../adam/rdd/fragment/FragmentRDD.scala | 13 ++ .../adam/rdd/read/AlignmentRecordRDD.scala | 15 +- .../adam/rdd/variant/GenotypeRDD.scala | 13 ++ .../adam/rdd/variant/VariantRDD.scala | 13 ++ adam-core/src/test/resources/multi_chr.sam | 7 + .../NucleotideContigFragmentRDDSuite.scala | 31 ++++ .../adam/rdd/feature/FeatureRDDSuite.scala | 23 +++ .../rdd/read/AlignmentRecordRDDSuite.scala | 35 ++++ .../adam/rdd/variant/GenotypeRDDSuite.scala | 8 + .../adam/rdd/variant/VariantRDDSuite.scala | 15 ++ 14 files changed, 424 insertions(+), 1 deletion(-) create mode 100644 adam-core/src/test/resources/multi_chr.sam diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 429868a3e6..6b9f04a39a 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -1866,6 +1866,37 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD. + * + * @note The sequence dictionary is read from an Avro file stored at + * pathName/_seqdict.avro and the record group dictionary is read from an + * Avro file stored at pathName/_rgdict.avro. These files are pure Avro, + * not Parquet + Avro. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @return Returns an AlignmentRecordRDD. + */ + def loadPartitionedParquetAlignments(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = { + + require(isPartitioned(pathName), + "Input Parquet files (%s) are not partitioned.".format(pathName)) + + val reads = loadParquetAlignments(pathName) + + val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) { + DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) + .filterByOverlappingRegions(regions) + } else { + DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) + } + + datasetBoundAlignmentRecordRDD + } + /** * Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD. * @@ -2249,6 +2280,32 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @return Returns a GenotypeRDD. + */ + def loadPartitionedParquetGenotypes(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty): GenotypeRDD = { + + require(isPartitioned(pathName), + "Input Parquet files (%s) are not partitioned.".format(pathName)) + + val genotypes = loadParquetGenotypes(pathName) + + val datasetBoundGenotypeRDD = if (regions.nonEmpty) { + DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines) + .filterByOverlappingRegions(regions) + } else { + DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines) + } + + datasetBoundGenotypeRDD + } + /** * Load a path name in Parquet + Avro format into a VariantRDD. * @@ -2282,6 +2339,32 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @return Returns a VariantRDD + */ + def loadPartitionedParquetVariants(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty): VariantRDD = { + + require(isPartitioned(pathName), + "Input Parquet files (%s) are not partitioned.".format(pathName)) + + val variants = loadParquetVariants(pathName) + + val datasetBoundVariantRDD = if (regions.nonEmpty) { + DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines) + .filterByOverlappingRegions(regions) + } else { + DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines) + } + + datasetBoundVariantRDD + } + /** * Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD. * @@ -2598,6 +2681,32 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @return Returns a FeatureRDD. + */ + def loadPartitionedParquetFeatures(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty): FeatureRDD = { + + require(isPartitioned(pathName), + "Input Parquet files (%s) are not partitioned.".format(pathName)) + + val features = loadParquetFeatures(pathName) + + val datasetBoundFeatureRDD = if (regions.nonEmpty) { + DatasetBoundFeatureRDD(features.dataset, features.sequences) + .filterByOverlappingRegions(regions) + } else { + DatasetBoundFeatureRDD(features.dataset, features.sequences) + } + + datasetBoundFeatureRDD + } + /** * Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD. * @@ -2630,6 +2739,32 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } + /** + * Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD. + * + * @param pathName The path name to load alignment records from. + * Globs/directories are supported. + * @param regions Optional list of genomic regions to load. + * @return Returns a NucleotideContigFragmentRDD + */ + def loadPartitionedParquetContigFragments(pathName: String, + regions: Iterable[ReferenceRegion] = Iterable.empty): NucleotideContigFragmentRDD = { + + require(isPartitioned(pathName), + "Input Parquet files (%s) are not partitioned.".format(pathName)) + + val contigs = loadParquetContigFragments(pathName) + + val datasetBoundNucleotideContigFragmentRDD = if (regions.nonEmpty) { + DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences) + .filterByOverlappingRegions(regions) + } else { + DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences) + } + + datasetBoundNucleotideContigFragmentRDD + } + /** * Load a path name in Parquet + Avro format into a FragmentRDD. * @@ -3051,4 +3186,22 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection) } } + + /** + * Return true if the specified path of Parquet + Avro files is partitioned. + * + * @param pathName Path in which to look for partitioned flag. + * @return Return true if the specified path of Parquet + Avro files is partitioned. + * Behavior is undefined if some paths in glob are contain paritioned flag and some do not. + */ + def isPartitioned(pathName: String): Boolean = { + + try { + getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")) + } catch { + case e: FileNotFoundException => false + } + true + } + } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 59e401a3a2..2c880832db 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.{ Function => JFunction, Function2 } import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext } +import org.apache.spark.sql.functions._ import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.models.{ @@ -54,6 +55,7 @@ import scala.collection.JavaConversions._ import scala.math.min import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import scala.util.Try private[rdd] class JavaSaveArgs(var outputPath: String, var blockSize: Int = 128 * 1024 * 1024, @@ -2261,6 +2263,21 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Geno convFn.call(v, dsX) }) } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundGenomicRDD with ReferenceRegions filter applied. + */ + def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionSize: Option[Int] = Some(1000000), + optPartitionedLookBackNum: Option[Int] = Some(1)): V = { + filterByOverlappingRegions(querys) + } } trait GenomicRDDWithLineage[T, U <: GenomicRDDWithLineage[T, U]] extends GenomicRDD[T, U] { @@ -2468,6 +2485,22 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A saveSequences(filePath) } + protected def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000, partitionedLookBackNum: Int = 1): String = { + + //test if this dataset is bound to Partitioned Parquet having field positionBin + if (Try(dataset("positionBin")).isSuccess) { + regions.map(r => "(contigName=" + "\'" + r.referenceName + + "\' and positionBin >= \'" + ((scala.math.floor(r.start / partitionSize).toInt) - partitionedLookBackNum) + + "\' and positionBin < \'" + (scala.math.floor(r.end / partitionSize).toInt + 1) + + "\' and (end > " + r.start + " and start < " + r.end + "))") + .mkString(" or ") + } else { // if no positionBin field is found then construct query without bin optimization + regions.map(r => "(contigName=" + "\'" + r.referenceName + + "\' and (end > " + r.start + " and start < " + r.end + "))") + .mkString(" or ") + } + } + /** * Saves RDD as a directory of Parquet files. * @@ -2543,4 +2576,39 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A def saveAsParquet(filePath: java.lang.String) { saveAsParquet(new JavaSaveArgs(filePath)) } + + /** + * Saves this RDD to disk in range binned partitioned Parquet + Avro format + * + * @param filePath Path to save the file at. + */ + private def writePartitionedParquetFlag(filePath: String): Boolean = { + val path = new Path(filePath, "_isPartitionedByStartPos") + val fs = path.getFileSystem(rdd.context.hadoopConfiguration) + fs.createNewFile(path) + } + + /** + * Saves this RDD to disk in range binned partitioned Parquet + Avro format + * + * @param filePath Path to save the file at. + * @param compressCodec Name of the compression codec to use. + * @param partitionSize size of partitions used when writing parquet, in base pairs. Defaults to 1000000. + */ + def saveAsPartitionedParquet(filePath: String, + compressCodec: CompressionCodecName = CompressionCodecName.GZIP, + partitionSize: Int = 1000000) { + log.warn("Saving directly as Hive-partitioned Parquet from SQL. " + + "Options other than compression codec are ignored.") + val df = toDF() + df.withColumn("positionBin", floor(df("start") / partitionSize)) + .write + .partitionBy("contigName", "positionBin") + .format("parquet") + .option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase()) + .save(filePath) + writePartitionedParquetFlag(filePath) + saveMetadata(filePath) + } + } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala index ce47d5cf6d..e6be5f35f1 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala @@ -157,6 +157,23 @@ case class DatasetBoundNucleotideContigFragmentRDD private[rdd] ( newSequences: SequenceDictionary): NucleotideContigFragmentRDD = { copy(sequences = newSequences) } + + override def transformDataset(tFn: Dataset[NucleotideContigFragmentProduct] => Dataset[NucleotideContigFragmentProduct]): NucleotideContigFragmentRDD = { + copy(dataset = tFn(dataset)) + } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundNucleotideContigFragmentRDD with ReferenceRegions filter applied. + */ + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): NucleotideContigFragmentRDD = { + transformDataset(((d: Dataset[org.bdgenomics.adam.sql.NucleotideContigFragment]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + } } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index dda3414cc2..56e5b8d2a9 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -310,6 +310,20 @@ case class DatasetBoundFeatureRDD private[rdd] ( .withColumnRenamed("score", "count") .as[Coverage], sequences) } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundFeatureRDD with ReferenceRegions filter applied. + */ + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): FeatureRDD = { + transformDataset(((d: Dataset[org.bdgenomics.adam.sql.Feature]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + } + } case class RDDBoundFeatureRDD private[rdd] ( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index ad741f927d..d085e13c33 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -222,6 +222,19 @@ case class DatasetBoundFragmentRDD private[rdd] ( newProcessingSteps: Seq[ProcessingStep]): FragmentRDD = { copy(processingSteps = newProcessingSteps) } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundFragmentRDD with ReferenceRegions filter applied. + */ + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): FragmentRDD = { + transformDataset(((d: Dataset[org.bdgenomics.adam.sql.Fragment]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + } } case class RDDBoundFragmentRDD private[rdd] ( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 1c3b3a0e4d..4f4df0c234 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -26,7 +26,7 @@ import java.nio.file.Paths import org.apache.hadoop.fs.Path import org.apache.hadoop.io.LongWritable import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.spark.SparkContext +import org.apache.spark.{ SparkContext } import org.apache.spark.api.java.JavaRDD import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.MetricsContext._ @@ -281,6 +281,19 @@ case class DatasetBoundAlignmentRecordRDD private[rdd] ( newProcessingSteps: Seq[ProcessingStep]): AlignmentRecordRDD = { copy(processingSteps = newProcessingSteps) } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundAlignmentRecordRDD with ReferenceRegions filter applied. + */ + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): AlignmentRecordRDD = { + transformDataset(((d: Dataset[org.bdgenomics.adam.sql.AlignmentRecord]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + } } case class RDDBoundAlignmentRecordRDD private[rdd] ( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala index e17eb0b2d2..cf70606b08 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala @@ -178,6 +178,19 @@ case class DatasetBoundGenotypeRDD private[rdd] ( def replaceSamples(newSamples: Iterable[Sample]): GenotypeRDD = { copy(samples = newSamples.toSeq) } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundGenotypeRDD with ReferenceRegions filter applied. + */ + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): GenotypeRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.Genotype]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get))) + } } case class RDDBoundGenotypeRDD private[rdd] ( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala index efc6c22bcc..2dc1fa946f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala @@ -167,6 +167,19 @@ case class DatasetBoundVariantRDD private[rdd] ( def replaceHeaderLines(newHeaderLines: Seq[VCFHeaderLine]): VariantRDD = { copy(headerLines = newHeaderLines) } + + /** + * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. + * + * @param querys ReferencesRegions to filter against + * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. + * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a + * ReferenceRegion, defaults to 1 + * @return Returns a new DatasetBoundVariantRDD with ReferenceRegions filter applied. + */ + override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): VariantRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.Variant]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get))) + } } case class RDDBoundVariantRDD private[rdd] ( diff --git a/adam-core/src/test/resources/multi_chr.sam b/adam-core/src/test/resources/multi_chr.sam new file mode 100644 index 0000000000..5a8a65115b --- /dev/null +++ b/adam-core/src/test/resources/multi_chr.sam @@ -0,0 +1,7 @@ +@SQ SN:1 LN:249250621 +@SQ SN:2 LN:243199373 +@PG ID:p1 PN:myProg CL:"myProg 123" VN:1.0.0 +@PG ID:p2 PN:myProg CL:"myProg 456" VN:1.0.0 PP:p1 +simread:1:26472783:false 16 1 26472784 60 75M * 0 0 GTATAAGAGCAGCCTTATTCCTATTTATAATCAGGGTGAAACACCTGTGCCAATGCCAAGACAGGGGTGCCAAGA * NM:i:0 AS:i:75 XS:i:0 +simread:1:240997787:true 0 1 240997788 60 75M * 0 0 CTTTATTTTTATTTTTAAGGTTTTTTTTGTTTGTTTGTTTTGAGATGGAGTCTCGCTCCACCGCCCAGACTGGAG * NM:i:0 AS:i:75 XS:i:39 +simread:1:189606653:true 0 2 189606654 60 75M * 0 0 TGTATCTTCCTCCCCTGCTGTATGTTTCCTGCCCTCAAACATCACACTCCACGTTCTTCAGCTTTAGGACTTGGA * NM:i:0 AS:i:75 XS:i:0 diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala index be61bf87db..df94032273 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala @@ -140,6 +140,37 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite { assert(fragments3.dataset.count === 8L) } + sparkTest("round trip a ncf to partitioned parquet") { + def testMetadata(fRdd: NucleotideContigFragmentRDD) { + val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L)) + assert(sequenceRdd.sequences.containsRefName("aSequence")) + } + + val fragments1 = sc.loadFasta(testFile("HLA_DQB1_05_01_01_02.fa"), 1000L) + assert(fragments1.rdd.count === 8L) + assert(fragments1.dataset.count === 8L) + testMetadata(fragments1) + + // save using dataset path + val output1 = tmpFile("ctg.adam") + val dsBound = fragments1.transformDataset(ds => ds) + testMetadata(dsBound) + dsBound.saveAsPartitionedParquet(output1) + val fragments2 = sc.loadPartitionedParquetContigFragments(output1) + testMetadata(fragments2) + assert(fragments2.rdd.count === 8L) + assert(fragments2.dataset.count === 8L) + + // save using rdd path + val output2 = tmpFile("ctg.adam") + val rddBound = fragments2.transform(rdd => rdd) + testMetadata(rddBound) + rddBound.saveAsPartitionedParquet(output2) + val fragments3 = sc.loadPartitionedParquetContigFragments(output2) + assert(fragments3.rdd.count === 8L) + assert(fragments3.dataset.count === 8L) + } + sparkTest("save fasta back as a single file") { val origFasta = testFile("artificial.fa") val tmpFasta = tmpFile("test.fa") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala index da7fed911c..a84a48811e 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/feature/FeatureRDDSuite.scala @@ -925,6 +925,29 @@ class FeatureRDDSuite extends ADAMFunSuite { assert(rdd3.dataset.count === 4) } + sparkTest("load partitioned parquet to sql, save, re-read from avro") { + def testMetadata(fRdd: FeatureRDD) { + val sequenceRdd = fRdd.addSequence(SequenceRecord("aSequence", 1000L)) + assert(sequenceRdd.sequences.containsRefName("aSequence")) + } + + val inputPath = testFile("small.1.bed") + val outputPath = tmpLocation() + val rrdd = sc.loadFeatures(inputPath) + testMetadata(rrdd) + val rdd = rrdd.transformDataset(ds => ds) // no-op but force to ds + testMetadata(rdd) + rdd.saveAsPartitionedParquet(outputPath) + val rdd2 = sc.loadPartitionedParquetFeatures(outputPath) + testMetadata(rdd2) + val outputPath2 = tmpLocation() + rdd.transform(rdd => rdd) // no-op but force to rdd + .saveAsPartitionedParquet(outputPath2) + val rdd3 = sc.loadPartitionedParquetFeatures(outputPath2) + assert(rdd3.rdd.count === 4) + assert(rdd3.dataset.count === 4) + } + sparkTest("transform features to contig rdd") { val features = sc.loadFeatures(testFile("sample_coverage.bed")) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 674186b7c0..d2058e0b3e 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -638,6 +638,41 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { assert(rdd3.dataset.count === 20) } + sparkTest("load from sam, save as partitioned parquet, and and re-read from partitioned parquet") { + def testMetadata(arRdd: AlignmentRecordRDD) { + val sequenceRdd = arRdd.addSequence(SequenceRecord("aSequence", 1000L)) + assert(sequenceRdd.sequences.containsRefName("aSequence")) + + val rgRdd = arRdd.addRecordGroup(RecordGroup("test", "aRg")) + assert(rgRdd.recordGroups("aRg").sample === "test") + } + + val inputPath = testFile("multi_chr.sam") + val outputPath = tmpLocation() + val rrdd = sc.loadAlignments(inputPath) + testMetadata(rrdd) + + rrdd.saveAsPartitionedParquet(outputPath, partitionSize = 1000000) + val rdd2 = sc.loadPartitionedParquetAlignments(outputPath) + testMetadata(rdd2) + assert(rdd2.rdd.count === 3) + assert(rdd2.dataset.count === 3) + + val rdd3 = sc.loadPartitionedParquetAlignments(outputPath, List(ReferenceRegion("1", 240000000L, 241000000L), ReferenceRegion("2", 189000000L, 190000000L))) + assert(rdd3.dataset.count === 2) + assert(rdd3.rdd.count === 2) + + //test explicit transform to Alignment Record Product + val rdd = rrdd.transformDataset(ds => { + import ds.sqlContext.implicits._ + val df = ds.toDF() + df.as[AlignmentRecordProduct] + }) + testMetadata(rdd) + assert(rdd.dataset.count === 3) + assert(rdd.rdd.count === 3) + } + sparkTest("save as SAM format") { val inputPath = testFile("small.sam") val reads: AlignmentRecordRDD = sc.loadAlignments(inputPath) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala index 06203abe82..b156e1f585 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala @@ -128,6 +128,14 @@ class GenotypeRDDSuite extends ADAMFunSuite { assert(starts(752790L)) } + sparkTest("round trip to partitioned parquet") { + val genotypes = sc.loadGenotypes(testFile("small.vcf")) + val outputPath = tmpLocation() + genotypes.saveAsPartitionedParquet(outputPath) + val unfilteredGenotypes = sc.loadPartitionedParquetGenotypes(outputPath) + assert(unfilteredGenotypes.rdd.count === 18) + } + sparkTest("use broadcast join to pull down genotypes mapped to targets") { val genotypesPath = testFile("small.vcf") val targetsPath = testFile("small.1.bed") diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala index 4e258ddc8b..c1957337b7 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/VariantRDDSuite.scala @@ -147,6 +147,21 @@ class VariantRDDSuite extends ADAMFunSuite { assert(starts(752790L)) } + sparkTest("save and reload from partitioned parquet") { + val variants = sc.loadVariants(testFile("sorted-variants.vcf")) + val outputPath = tmpLocation() + variants.saveAsPartitionedParquet(outputPath, partitionSize = 1000000) + val unfilteredVariants = sc.loadPartitionedParquetVariants(outputPath) + assert(unfilteredVariants.rdd.count === 6) + assert(unfilteredVariants.dataset.count === 6) + + val regionsVariants = sc.loadPartitionedParquetVariants(outputPath, + List(ReferenceRegion("2", 19000L, 21000L), + ReferenceRegion("13", 752700L, 752750L))) + assert(regionsVariants.rdd.count === 2) + assert(regionsVariants.dataset.count === 2) + } + sparkTest("use broadcast join to pull down variants mapped to targets") { val variantsPath = testFile("small.vcf") val targetsPath = testFile("small.1.bed") From c85bdae7a6857ea47badcdf9468539e6972076cd Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Fri, 16 Feb 2018 20:21:58 -0500 Subject: [PATCH 2/5] Save partition size to isPartitioned flag file and load as metadata --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 55 +++++++++---------- .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 18 +++--- .../contig/NucleotideContigFragmentRDD.scala | 10 ++-- .../adam/rdd/feature/FeatureRDD.scala | 11 ++-- .../adam/rdd/fragment/FragmentRDD.scala | 10 ++-- .../adam/rdd/read/AlignmentRecordRDD.scala | 10 ++-- .../adam/rdd/variant/GenotypeRDD.scala | 10 ++-- .../adam/rdd/variant/VariantRDD.scala | 10 ++-- 8 files changed, 71 insertions(+), 63 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 6b9f04a39a..7e1f0f0bdb 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -1882,13 +1882,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadPartitionedParquetAlignments(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = { - require(isPartitioned(pathName), - "Input Parquet files (%s) are not partitioned.".format(pathName)) - + val partitionedBinSize = getPartitionedBinSize(pathName) val reads = loadParquetAlignments(pathName) val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) { - DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) + DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps, Some(partitionedBinSize)) .filterByOverlappingRegions(regions) } else { DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) @@ -2291,16 +2289,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadPartitionedParquetGenotypes(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): GenotypeRDD = { - require(isPartitioned(pathName), - "Input Parquet files (%s) are not partitioned.".format(pathName)) - + val partitionedBinSize = getPartitionedBinSize(pathName) val genotypes = loadParquetGenotypes(pathName) val datasetBoundGenotypeRDD = if (regions.nonEmpty) { - DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines) + DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize)) .filterByOverlappingRegions(regions) } else { - DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines) + DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize)) } datasetBoundGenotypeRDD @@ -2350,16 +2346,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadPartitionedParquetVariants(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): VariantRDD = { - require(isPartitioned(pathName), - "Input Parquet files (%s) are not partitioned.".format(pathName)) - + val partitionedBinSize = getPartitionedBinSize(pathName) val variants = loadParquetVariants(pathName) val datasetBoundVariantRDD = if (regions.nonEmpty) { - DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines) + DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines, Some(partitionedBinSize)) .filterByOverlappingRegions(regions) } else { - DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines) + DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines, Some(partitionedBinSize)) } datasetBoundVariantRDD @@ -2692,16 +2686,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadPartitionedParquetFeatures(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): FeatureRDD = { - require(isPartitioned(pathName), - "Input Parquet files (%s) are not partitioned.".format(pathName)) - + val partitionedBinSize = getPartitionedBinSize(pathName) val features = loadParquetFeatures(pathName) val datasetBoundFeatureRDD = if (regions.nonEmpty) { - DatasetBoundFeatureRDD(features.dataset, features.sequences) + DatasetBoundFeatureRDD(features.dataset, features.sequences, Some(partitionedBinSize)) .filterByOverlappingRegions(regions) } else { - DatasetBoundFeatureRDD(features.dataset, features.sequences) + DatasetBoundFeatureRDD(features.dataset, features.sequences, Some(partitionedBinSize)) } datasetBoundFeatureRDD @@ -2750,16 +2742,14 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log def loadPartitionedParquetContigFragments(pathName: String, regions: Iterable[ReferenceRegion] = Iterable.empty): NucleotideContigFragmentRDD = { - require(isPartitioned(pathName), - "Input Parquet files (%s) are not partitioned.".format(pathName)) - + val partitionedBinSize = getPartitionedBinSize(pathName) val contigs = loadParquetContigFragments(pathName) val datasetBoundNucleotideContigFragmentRDD = if (regions.nonEmpty) { - DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences) + DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences, Some(partitionedBinSize)) .filterByOverlappingRegions(regions) } else { - DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences) + DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences, Some(partitionedBinSize)) } datasetBoundNucleotideContigFragmentRDD @@ -3192,16 +3182,21 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * * @param pathName Path in which to look for partitioned flag. * @return Return true if the specified path of Parquet + Avro files is partitioned. - * Behavior is undefined if some paths in glob are contain paritioned flag and some do not. + * + * If a glob is used, all directories within the blog must be partitioned, and must have been saved + * using the same partitioned bin size. Behavior is undefined if this requirement is not met. */ - def isPartitioned(pathName: String): Boolean = { + def getPartitionedBinSize(pathName: String): Int = { - try { - getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")) + val partitionSize = try { + val f = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).head + f.getFileSystem(sc.hadoopConfiguration).open(f).readInt() } catch { - case e: FileNotFoundException => false + case e: FileNotFoundException => { + throw new FileNotFoundException("Input Parquet files (%s) are not partitioned.".format(pathName)) + } } - true + partitionSize } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 2c880832db..faa199ff49 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -20,7 +20,7 @@ package org.bdgenomics.adam.rdd import java.nio.file.Paths import htsjdk.samtools.ValidationStringency import org.apache.avro.generic.IndexedRecord -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{ FSDataOutputStream, FileSystem, Path } import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkFiles import org.apache.spark.api.java.JavaRDD @@ -2268,14 +2268,12 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Geno * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundGenomicRDD with ReferenceRegions filter applied. */ - def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], - optPartitionSize: Option[Int] = Some(1000000), - optPartitionedLookBackNum: Option[Int] = Some(1)): V = { + def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): V = { filterByOverlappingRegions(querys) } } @@ -2582,10 +2580,12 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A * * @param filePath Path to save the file at. */ - private def writePartitionedParquetFlag(filePath: String): Boolean = { + private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = { val path = new Path(filePath, "_isPartitionedByStartPos") - val fs = path.getFileSystem(rdd.context.hadoopConfiguration) - fs.createNewFile(path) + val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration) + val f = fs.create(path) + f.writeInt(partitionSize) + f.close() } /** @@ -2607,7 +2607,7 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A .format("parquet") .option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase()) .save(filePath) - writePartitionedParquetFlag(filePath) + writePartitionedParquetFlag(filePath, partitionSize) saveMetadata(filePath) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala index e6be5f35f1..d22be43bf5 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala @@ -133,7 +133,8 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] ( case class DatasetBoundNucleotideContigFragmentRDD private[rdd] ( dataset: Dataset[NucleotideContigFragmentProduct], - sequences: SequenceDictionary) extends NucleotideContigFragmentRDD { + sequences: SequenceDictionary, + partitionedBinSize: Option[Int] = None) extends NucleotideContigFragmentRDD { lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro) @@ -166,13 +167,14 @@ case class DatasetBoundNucleotideContigFragmentRDD private[rdd] ( * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundNucleotideContigFragmentRDD with ReferenceRegions filter applied. */ - override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): NucleotideContigFragmentRDD = { - transformDataset(((d: Dataset[org.bdgenomics.adam.sql.NucleotideContigFragment]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): NucleotideContigFragmentRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.NucleotideContigFragment]) => + d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index 56e5b8d2a9..3030b5d643 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -275,7 +275,8 @@ case class ParquetUnboundFeatureRDD private[rdd] ( case class DatasetBoundFeatureRDD private[rdd] ( dataset: Dataset[FeatureProduct], - sequences: SequenceDictionary) extends FeatureRDD { + sequences: SequenceDictionary, + partitionedBinSize: Option[Int] = None) extends FeatureRDD { lazy val rdd = dataset.rdd.map(_.toAvro) protected lazy val optPartitionMap = None @@ -315,13 +316,15 @@ case class DatasetBoundFeatureRDD private[rdd] ( * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundFeatureRDD with ReferenceRegions filter applied. */ - override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): FeatureRDD = { - transformDataset(((d: Dataset[org.bdgenomics.adam.sql.Feature]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): FeatureRDD = { + + transformDataset((d: Dataset[org.bdgenomics.adam.sql.Feature]) => + d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index d085e13c33..fb92ad51b2 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -183,7 +183,8 @@ case class DatasetBoundFragmentRDD private[rdd] ( dataset: Dataset[FragmentProduct], sequences: SequenceDictionary, recordGroups: RecordGroupDictionary, - @transient val processingSteps: Seq[ProcessingStep]) extends FragmentRDD { + @transient val processingSteps: Seq[ProcessingStep], + partitionedBinSize: Option[Int] = None) extends FragmentRDD { lazy val rdd = dataset.rdd.map(_.toAvro) @@ -227,13 +228,14 @@ case class DatasetBoundFragmentRDD private[rdd] ( * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundFragmentRDD with ReferenceRegions filter applied. */ - override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): FragmentRDD = { - transformDataset(((d: Dataset[org.bdgenomics.adam.sql.Fragment]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): FragmentRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.Fragment]) => + d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 4f4df0c234..0bf9dcac32 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -242,7 +242,8 @@ case class DatasetBoundAlignmentRecordRDD private[rdd] ( dataset: Dataset[AlignmentRecordProduct], sequences: SequenceDictionary, recordGroups: RecordGroupDictionary, - @transient val processingSteps: Seq[ProcessingStep]) extends AlignmentRecordRDD { + @transient val processingSteps: Seq[ProcessingStep], + partitionedBinSize: Option[Int] = None) extends AlignmentRecordRDD { lazy val rdd = dataset.rdd.map(_.toAvro) @@ -286,13 +287,14 @@ case class DatasetBoundAlignmentRecordRDD private[rdd] ( * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundAlignmentRecordRDD with ReferenceRegions filter applied. */ - override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): AlignmentRecordRDD = { - transformDataset(((d: Dataset[org.bdgenomics.adam.sql.AlignmentRecord]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get)))) + override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): AlignmentRecordRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.AlignmentRecord]) => + d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala index cf70606b08..2e4a2548f3 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala @@ -141,7 +141,8 @@ case class DatasetBoundGenotypeRDD private[rdd] ( dataset: Dataset[GenotypeProduct], sequences: SequenceDictionary, @transient samples: Seq[Sample], - @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends GenotypeRDD { + @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, + partitionedBinSize: Option[Int] = None) extends GenotypeRDD { protected lazy val optPartitionMap = None @@ -183,13 +184,14 @@ case class DatasetBoundGenotypeRDD private[rdd] ( * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundGenotypeRDD with ReferenceRegions filter applied. */ - override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): GenotypeRDD = { - transformDataset((d: Dataset[org.bdgenomics.adam.sql.Genotype]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get))) + override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): GenotypeRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.Genotype]) => + d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala index 2dc1fa946f..5aa53b6af6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala @@ -134,7 +134,8 @@ case class ParquetUnboundVariantRDD private[rdd] ( case class DatasetBoundVariantRDD private[rdd] ( dataset: Dataset[VariantProduct], sequences: SequenceDictionary, - @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines) extends VariantRDD { + @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, + partitionedBinSize: Option[Int] = None) extends VariantRDD { protected lazy val optPartitionMap = None @@ -172,13 +173,14 @@ case class DatasetBoundVariantRDD private[rdd] ( * Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions. * * @param querys ReferencesRegions to filter against - * @param optPartitionSize Optional partitionSize used for partitioned Parquet, defaults to 1000000. * @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a * ReferenceRegion, defaults to 1 * @return Returns a new DatasetBoundVariantRDD with ReferenceRegions filter applied. */ - override def filterByOverlappingRegions(querys: Iterable[ReferenceRegion], optPartitionSize: Option[Int] = Some(1000000), optPartitionedLookBackNum: Option[Int] = Some(1)): VariantRDD = { - transformDataset((d: Dataset[org.bdgenomics.adam.sql.Variant]) => d.filter(referenceRegionsToDatasetQueryString(querys, optPartitionSize.get, optPartitionedLookBackNum.get))) + override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], + optPartitionedLookBackNum: Option[Int] = Some(1)): VariantRDD = { + transformDataset((d: Dataset[org.bdgenomics.adam.sql.Variant]) => + d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) } } From f69adb5e8b26a0c0c8ca12b745251314058d1dad Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Fri, 16 Feb 2018 21:46:24 -0500 Subject: [PATCH 3/5] Added CLI option to save as parquet dataset --- .../org/bdgenomics/adam/cli/TransformAlignments.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala index dfc332d081..a89ec8046f 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala @@ -131,6 +131,8 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu var storageLevel: String = "MEMORY_ONLY" @Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") var disableProcessingStep = false + @Args4jOption(required = false, name = "-save_as_dataset", usage = "EXPERIMENTAL: Save as a Parquet format Spark-SQL dataset") + var saveAsDataset = false var command: String = null } @@ -562,8 +564,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B mergedSd } - outputRdd.save(args, - isSorted = args.sortReads || args.sortLexicographically) + if (args.saveAsDataset) { + outputRdd.saveAsPartitionedParquet(args.outputPath) + } else { + outputRdd.save(args, + isSorted = args.sortReads || args.sortLexicographically) + } } private def createKnownSnpsTable(sc: SparkContext): SnpTable = { From 8b63059acdb624833ccdb7c8f6330964cb26eb73 Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Fri, 16 Feb 2018 22:04:52 -0500 Subject: [PATCH 4/5] nit --- .../scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 0bf9dcac32..d6923b27ce 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -26,7 +26,7 @@ import java.nio.file.Paths import org.apache.hadoop.fs.Path import org.apache.hadoop.io.LongWritable import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.spark.{ SparkContext } +import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.MetricsContext._ From 5c29dde556bed09a2a6d88703a40730d6752754c Mon Sep 17 00:00:00 2001 From: Justin Paschall Date: Sat, 17 Feb 2018 10:24:24 -0500 Subject: [PATCH 5/5] fixed missing extends typoe tin DataBoundAlignmentRecordRDD --- .../scala/org/bdgenomics/adam/rdd/GenomicRDD.scala | 1 + .../rdd/contig/NucleotideContigFragmentRDD.scala | 10 ++++------ .../org/bdgenomics/adam/rdd/feature/FeatureRDD.scala | 8 ++++---- .../bdgenomics/adam/rdd/fragment/FragmentRDD.scala | 12 ++++++------ .../adam/rdd/read/AlignmentRecordRDD.scala | 12 ++++++------ .../bdgenomics/adam/rdd/variant/GenotypeRDD.scala | 12 ++++++------ .../org/bdgenomics/adam/rdd/variant/VariantRDD.scala | 10 +++++----- 7 files changed, 32 insertions(+), 33 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index d3ac297d99..3cc050ccae 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -2560,6 +2560,7 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A "\' and (end > " + r.start + " and start < " + r.end + "))") .mkString(" or ") } + } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala index 9d5edae566..bf349b0739 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala @@ -136,12 +136,10 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] ( } case class DatasetBoundNucleotideContigFragmentRDD private[rdd] ( - dataset: Dataset[NucleotideContigFragmentProduct], - sequences: SequenceDictionary, - partitionedBinSize: Option[Int] = None) extends NucleotideContigFragmentRDD - with DatasetBoundGenomicDataset[NucleotideContigFragment, - NucleotideContigFragmentProduct, - NucleotideContigFragmentRDD] { + dataset: Dataset[NucleotideContigFragmentProduct], + sequences: SequenceDictionary, + partitionedBinSize: Option[Int] = None) extends NucleotideContigFragmentRDD + with DatasetBoundGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] { lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro) protected lazy val optPartitionMap = None diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index 946dab3404..04b1149c95 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -275,10 +275,10 @@ case class ParquetUnboundFeatureRDD private[rdd] ( } case class DatasetBoundFeatureRDD private[rdd] ( - dataset: Dataset[FeatureProduct], - sequences: SequenceDictionary, - partitionedBinSize: Option[Int] = None) extends FeatureRDD - with DatasetBoundGenomicDataset[Feature, FeatureProduct, FeatureRDD] { + dataset: Dataset[FeatureProduct], + sequences: SequenceDictionary, + partitionedBinSize: Option[Int] = None) extends FeatureRDD + with DatasetBoundGenomicDataset[Feature, FeatureProduct, FeatureRDD] { lazy val rdd = dataset.rdd.map(_.toAvro) protected lazy val optPartitionMap = None diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index 77d2c22145..cb6301b2d3 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -184,12 +184,12 @@ case class ParquetUnboundFragmentRDD private[rdd] ( } case class DatasetBoundFragmentRDD private[rdd] ( - dataset: Dataset[FragmentProduct], - sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary, - @transient val processingSteps: Seq[ProcessingStep], - partitionedBinSize: Option[Int] = None) extends FragmentRDD - with DatasetBoundGenomicDataset[Fragment, FragmentProduct, FragmentRDD] { + dataset: Dataset[FragmentProduct], + sequences: SequenceDictionary, + recordGroups: RecordGroupDictionary, + @transient val processingSteps: Seq[ProcessingStep], + partitionedBinSize: Option[Int] = None) extends FragmentRDD + with DatasetBoundGenomicDataset[Fragment, FragmentProduct, FragmentRDD] { lazy val rdd = dataset.rdd.map(_.toAvro) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index d86668ca03..1c1c8831b7 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -234,12 +234,12 @@ case class ParquetUnboundAlignmentRecordRDD private[rdd] ( } case class DatasetBoundAlignmentRecordRDD private[rdd] ( - dataset: Dataset[AlignmentRecordProduct], - sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary, - @transient val processingSteps: Seq[ProcessingStep], - partitionedBinSize: Option[Int] = None) extends - with DatasetBoundGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] { + dataset: Dataset[AlignmentRecordProduct], + sequences: SequenceDictionary, + recordGroups: RecordGroupDictionary, + @transient val processingSteps: Seq[ProcessingStep], + partitionedBinSize: Option[Int] = None) extends AlignmentRecordRDD + with DatasetBoundGenomicDataset[AlignmentRecord, AlignmentRecordProduct, AlignmentRecordRDD] { lazy val rdd = dataset.rdd.map(_.toAvro) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala index 87bdfd8c93..5e1251c2c1 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala @@ -142,12 +142,12 @@ case class ParquetUnboundGenotypeRDD private[rdd] ( } case class DatasetBoundGenotypeRDD private[rdd] ( - dataset: Dataset[GenotypeProduct], - sequences: SequenceDictionary, - @transient samples: Seq[Sample], - @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, - partitionedBinSize: Option[Int] = None) extends GenotypeRDD - with DatasetBoundGenomicDataset[Genotype, GenotypeProduct, GenotypeRDD] { + dataset: Dataset[GenotypeProduct], + sequences: SequenceDictionary, + @transient samples: Seq[Sample], + @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, + partitionedBinSize: Option[Int] = None) extends GenotypeRDD + with DatasetBoundGenomicDataset[Genotype, GenotypeProduct, GenotypeRDD] { protected lazy val optPartitionMap = None diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala index 8c81f48ef1..37dc142c61 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantRDD.scala @@ -133,11 +133,11 @@ case class ParquetUnboundVariantRDD private[rdd] ( } case class DatasetBoundVariantRDD private[rdd] ( - dataset: Dataset[VariantProduct], - sequences: SequenceDictionary, - @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, - partitionedBinSize: Option[Int] = None) extends VariantRDD - with DatasetBoundGenomicDataset[Variant, VariantProduct, VariantRDD] { + dataset: Dataset[VariantProduct], + sequences: SequenceDictionary, + @transient headerLines: Seq[VCFHeaderLine] = DefaultHeaderLines.allHeaderLines, + partitionedBinSize: Option[Int] = None) extends VariantRDD + with DatasetBoundGenomicDataset[Variant, VariantProduct, VariantRDD] { protected lazy val optPartitionMap = None