Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-651] Hive-style partitioning of parquet files by genomic position #1911

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false
@Args4jOption(required = false, name = "-save_as_dataset", usage = "EXPERIMENTAL: Save as a Parquet format Spark-SQL dataset")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc on this is a bit misleading. The partitioning is enabled by Spark SQL, but is not really Spark SQL specific. I'd say that providing this flag saves the data partitioned by genomic locus using Hive-style partitioning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, "Spark SQL" vs "Spark-SQL".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved - and changed parameter to an integer so user can specify bin size.

var saveAsDataset = false

var command: String = null
}
Expand Down Expand Up @@ -562,8 +564,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
mergedSd
}

outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
if (args.saveAsDataset) {
outputRdd.saveAsPartitionedParquet(args.outputPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit irrelevant to this line, but OOC, how does saveAsPartitionedParquet handle data that isn't aligned? It seems like we may want to warn if there isn't a sequence dictionary attached.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will save all data to partition bin number 0, and doing so will not result in any benefit.
A warning stating that has been added with log.warn

} else {
outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
}
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = {
Expand Down
148 changes: 148 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD.
*
* @note The sequence dictionary is read from an Avro file stored at
* pathName/_seqdict.avro and the record group dictionary is read from an
* Avro file stored at pathName/_rgdict.avro. These files are pure Avro,
* not Parquet + Avro.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns an AlignmentRecordRDD.
*/
def loadPartitionedParquetAlignments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val reads = loadParquetAlignments(pathName)

val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) {
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

.filterByOverlappingRegions(regions)
} else {
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

}

datasetBoundAlignmentRecordRDD
}

/**
* Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -2249,6 +2278,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a GenotypeRDD.
*/
def loadPartitionedParquetGenotypes(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): GenotypeRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val genotypes = loadParquetGenotypes(pathName)

val datasetBoundGenotypeRDD = if (regions.nonEmpty) {
DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

.filterByOverlappingRegions(regions)
} else {
DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

}

datasetBoundGenotypeRDD
}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down Expand Up @@ -2282,6 +2335,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a VariantRDD
*/
def loadPartitionedParquetVariants(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): VariantRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val variants = loadParquetVariants(pathName)

val datasetBoundVariantRDD = if (regions.nonEmpty) {
DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

.filterByOverlappingRegions(regions)
} else {
DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

}

datasetBoundVariantRDD
}

/**
* Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2598,6 +2675,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a FeatureRDD.
*/
def loadPartitionedParquetFeatures(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): FeatureRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val features = loadParquetFeatures(pathName)

val datasetBoundFeatureRDD = if (regions.nonEmpty) {
DatasetBoundFeatureRDD(features.dataset, features.sequences, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

.filterByOverlappingRegions(regions)
} else {
DatasetBoundFeatureRDD(features.dataset, features.sequences, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

}

datasetBoundFeatureRDD
}

/**
* Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2630,6 +2731,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a NucleotideContigFragmentRDD
*/
def loadPartitionedParquetContigFragments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): NucleotideContigFragmentRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val contigs = loadParquetContigFragments(pathName)

val datasetBoundNucleotideContigFragmentRDD = if (regions.nonEmpty) {
DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

.filterByOverlappingRegions(regions)
} else {
DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences, Some(partitionedBinSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Long line, should be broken up.

}

datasetBoundNucleotideContigFragmentRDD
}

/**
* Load a path name in Parquet + Avro format into a FragmentRDD.
*
Expand Down Expand Up @@ -3051,4 +3176,27 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection)
}
}

/**
* Return true if the specified path of Parquet + Avro files is partitioned.
*
* @param pathName Path in which to look for partitioned flag.
* @return Return true if the specified path of Parquet + Avro files is partitioned.
*
* If a glob is used, all directories within the blog must be partitioned, and must have been saved
* using the same partitioned bin size. Behavior is undefined if this requirement is not met.
*/
def getPartitionedBinSize(pathName: String): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should be private, or at least private[rdd].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but had to add a separate public isPartitioned function as because Mango needs to be able to check to see if data is partitioned.


val partitionSize = try {
val f = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).head
f.getFileSystem(sc.hadoopConfiguration).open(f).readInt()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to close the stream after you open it. E.g.:

val is = f.getFileSystem(sc.hadoopConfiguration).open(f)
val bucketSize = is.readInt()
is.close()

Additionally, I'd prefer that we enforced that all partitioned files had the same bin size. E.g.,

val partitionSizes = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).map(f => {
    val is = f.getFileSystem(sc.hadoopConfiguration).open(f)
    val partitionSize = is.readInt()
    is.close()
    partitionSize
  }).toSet

require(partitionSizes.nonEmpty, "Input Parquet files (%s) are not partitioned.".format(pathName))
require(partitionSizes.size == 1, "Found multiple partition sizes (%s).".format(partitionSizes.mkString(", "))

partitionSizes.head

You shouldn't ever actually get a FileNotFoundException; if there's no _isPartitionedByStartPos files, then getFsAndFilesWithFilter shouldn't return any files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} catch {
case e: FileNotFoundException => {
throw new FileNotFoundException("Input Parquet files (%s) are not partitioned.".format(pathName))
}
}
partitionSize
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.bdgenomics.adam.rdd
import java.nio.file.Paths
import htsjdk.samtools.ValidationStringency
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{ FSDataOutputStream, FileSystem, Path }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{ Function => JFunction, Function2 }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand All @@ -54,6 +55,7 @@ import scala.collection.JavaConversions._
import scala.math.min
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

private[rdd] class JavaSaveArgs(var outputPath: String,
var blockSize: Int = 128 * 1024 * 1024,
Expand Down Expand Up @@ -2324,6 +2326,19 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Geno
convFn.call(v, dsX)
})
}

/**
* Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions.
*
* @param querys ReferencesRegions to filter against
* @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a
* ReferenceRegion, defaults to 1
* @return Returns a new DatasetBoundGenomicRDD with ReferenceRegions filter applied.
*/
def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd looove this to move into the new DatasetBoundGenomicDataset trait which came in @akmorrow13's 6051321, and then this would override filterByOverlappingRegions. However, this has a slightly different signature due to the addition of optPartitionedLookBackNum. I propose that we either:

  • Move this into DatasetBoundGenomicDataset and make this method protected, and then override filterByOverlappingRegions(querys) to call filterDatasetByOverlappingRegions(querys).
  • Or, keep this as is now but open a ticket to close up this API in 0.25.0.

Thoughts?

Unrelated nit: querys should be queries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively - upon thinking about it, I don't like this optPartitionedLookbacknum as a parameter to the filterByOverlap function anyhow, as this is a config parameter tied to the dataset just like partitionedBinSize that should only need be set once when the dataset is created, so we could add it as another optional parameter to the DatasetBoundtype constructor.

If we did that, optPartitionedLookbacknum, the parameter could go away making overriding filterByOverlappingRegions override in DatasetBoudGenomicDataset work cleanly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

optPartitionedLookBackNum: Option[Int] = Some(1)): V = {
filterByOverlappingRegions(querys)
}
}

trait GenomicRDDWithLineage[T, U <: GenomicRDDWithLineage[T, U]] extends GenomicRDD[T, U] {
Expand Down Expand Up @@ -2531,6 +2546,23 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
saveSequences(filePath)
}

protected def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000, partitionedLookBackNum: Int = 1): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that a lot of downstream classes that implement GenomicDataset wind up overriding filterDatasetByOverlappingRegions with transformDataset(ds => ds.filter(referenceRegionsToDatasetQueryString(...)). I get why they have to do this, and I think fixing it is out of scope for this PR. However, can you open a ticket for us to move that code into a trait in a future release?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In update the filterDatasetByOverlappingRegion is implemented generically in trait DatasetBoundGenomicDataset , thus no more overrideing


//test if this dataset is bound to Partitioned Parquet having field positionBin
if (Try(dataset("positionBin")).isSuccess) {
regions.map(r => "(contigName=" + "\'" + r.referenceName +
"\' and positionBin >= \'" + ((scala.math.floor(r.start / partitionSize).toInt) - partitionedLookBackNum) +
"\' and positionBin < \'" + (scala.math.floor(r.end / partitionSize).toInt + 1) +
"\' and (end > " + r.start + " and start < " + r.end + "))")
.mkString(" or ")
} else { // if no positionBin field is found then construct query without bin optimization
regions.map(r => "(contigName=" + "\'" + r.referenceName +
"\' and (end > " + r.start + " and start < " + r.end + "))")
.mkString(" or ")
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra whitespace.

}

/**
* Saves RDD as a directory of Parquet files.
*
Expand Down Expand Up @@ -2606,4 +2638,41 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
def saveAsParquet(filePath: java.lang.String) {
saveAsParquet(new JavaSaveArgs(filePath))
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
*/
private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = {
val path = new Path(filePath, "_isPartitionedByStartPos")
val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration)
val f = fs.create(path)
f.writeInt(partitionSize)
f.close()
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
* @param compressCodec Name of the compression codec to use.
* @param partitionSize size of partitions used when writing parquet, in base pairs. Defaults to 1000000.
*/
def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()
df.withColumn("positionBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "positionBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
writePartitionedParquetFlag(filePath, partitionSize)
saveMetadata(filePath)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] (

case class DatasetBoundNucleotideContigFragmentRDD private[rdd] (
dataset: Dataset[NucleotideContigFragmentProduct],
sequences: SequenceDictionary) extends NucleotideContigFragmentRDD
sequences: SequenceDictionary,
partitionedBinSize: Option[Int] = None) extends NucleotideContigFragmentRDD
with DatasetBoundGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] {

lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro)

protected lazy val optPartitionMap = None
Expand All @@ -162,6 +162,24 @@ case class DatasetBoundNucleotideContigFragmentRDD private[rdd] (
newSequences: SequenceDictionary): NucleotideContigFragmentRDD = {
copy(sequences = newSequences)
}

override def transformDataset(tFn: Dataset[NucleotideContigFragmentProduct] => Dataset[NucleotideContigFragmentProduct]): NucleotideContigFragmentRDD = {
copy(dataset = tFn(dataset))
}

/**
* Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions.
*
* @param querys ReferencesRegions to filter against
* @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a
* ReferenceRegion, defaults to 1
* @return Returns a new DatasetBoundNucleotideContigFragmentRDD with ReferenceRegions filter applied.
*/
override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion],
optPartitionedLookBackNum: Option[Int] = Some(1)): NucleotideContigFragmentRDD = {
transformDataset((d: Dataset[org.bdgenomics.adam.sql.NucleotideContigFragment]) =>
d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get)))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ case class ParquetUnboundFeatureRDD private[rdd] (

case class DatasetBoundFeatureRDD private[rdd] (
dataset: Dataset[FeatureProduct],
sequences: SequenceDictionary) extends FeatureRDD
sequences: SequenceDictionary,
partitionedBinSize: Option[Int] = None) extends FeatureRDD
with DatasetBoundGenomicDataset[Feature, FeatureProduct, FeatureRDD] {

lazy val rdd = dataset.rdd.map(_.toAvro)
Expand Down Expand Up @@ -312,6 +313,22 @@ case class DatasetBoundFeatureRDD private[rdd] (
.withColumnRenamed("score", "count")
.as[Coverage], sequences)
}

/**
* Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions.
*
* @param querys ReferencesRegions to filter against
* @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a
* ReferenceRegion, defaults to 1
* @return Returns a new DatasetBoundFeatureRDD with ReferenceRegions filter applied.
*/
override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion],
optPartitionedLookBackNum: Option[Int] = Some(1)): FeatureRDD = {

transformDataset((d: Dataset[org.bdgenomics.adam.sql.Feature]) =>
d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get)))
}

}

case class RDDBoundFeatureRDD private[rdd] (
Expand Down
Loading