Skip to content

Commit

Permalink
Merge 5c29dde into 67890b8
Browse files Browse the repository at this point in the history
  • Loading branch information
jpdna authored Feb 17, 2018
2 parents 67890b8 + 5c29dde commit 026b51f
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false
@Args4jOption(required = false, name = "-save_as_dataset", usage = "EXPERIMENTAL: Save as a Parquet format Spark-SQL dataset")
var saveAsDataset = false

var command: String = null
}
Expand Down Expand Up @@ -562,8 +564,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
mergedSd
}

outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
if (args.saveAsDataset) {
outputRdd.saveAsPartitionedParquet(args.outputPath)
} else {
outputRdd.save(args,
isSorted = args.sortReads || args.sortLexicographically)
}
}

private def createKnownSnpsTable(sc: SparkContext): SnpTable = {
Expand Down
148 changes: 148 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,35 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an AlignmentRecordRDD.
*
* @note The sequence dictionary is read from an Avro file stored at
* pathName/_seqdict.avro and the record group dictionary is read from an
* Avro file stored at pathName/_rgdict.avro. These files are pure Avro,
* not Parquet + Avro.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns an AlignmentRecordRDD.
*/
def loadPartitionedParquetAlignments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): AlignmentRecordRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val reads = loadParquetAlignments(pathName)

val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) {
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps, Some(partitionedBinSize))
.filterByOverlappingRegions(regions)
} else {
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps)
}

datasetBoundAlignmentRecordRDD
}

/**
* Load unaligned alignment records from interleaved FASTQ into an AlignmentRecordRDD.
*
Expand Down Expand Up @@ -2249,6 +2278,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into GenotypeRDD
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a GenotypeRDD.
*/
def loadPartitionedParquetGenotypes(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): GenotypeRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val genotypes = loadParquetGenotypes(pathName)

val datasetBoundGenotypeRDD = if (regions.nonEmpty) {
DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize))
.filterByOverlappingRegions(regions)
} else {
DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize))
}

datasetBoundGenotypeRDD
}

/**
* Load a path name in Parquet + Avro format into a VariantRDD.
*
Expand Down Expand Up @@ -2282,6 +2335,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into an VariantRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a VariantRDD
*/
def loadPartitionedParquetVariants(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): VariantRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val variants = loadParquetVariants(pathName)

val datasetBoundVariantRDD = if (regions.nonEmpty) {
DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines, Some(partitionedBinSize))
.filterByOverlappingRegions(regions)
} else {
DatasetBoundVariantRDD(variants.dataset, variants.sequences, variants.headerLines, Some(partitionedBinSize))
}

datasetBoundVariantRDD
}

/**
* Load nucleotide contig fragments from FASTA into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2598,6 +2675,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a FeatureRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a FeatureRDD.
*/
def loadPartitionedParquetFeatures(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): FeatureRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val features = loadParquetFeatures(pathName)

val datasetBoundFeatureRDD = if (regions.nonEmpty) {
DatasetBoundFeatureRDD(features.dataset, features.sequences, Some(partitionedBinSize))
.filterByOverlappingRegions(regions)
} else {
DatasetBoundFeatureRDD(features.dataset, features.sequences, Some(partitionedBinSize))
}

datasetBoundFeatureRDD
}

/**
* Load a path name in Parquet + Avro format into a NucleotideContigFragmentRDD.
*
Expand Down Expand Up @@ -2630,6 +2731,30 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}

/**
* Load a path name with range binned partitioned Parquet + Avro format into a NucleotideContigFragmentRDD.
*
* @param pathName The path name to load alignment records from.
* Globs/directories are supported.
* @param regions Optional list of genomic regions to load.
* @return Returns a NucleotideContigFragmentRDD
*/
def loadPartitionedParquetContigFragments(pathName: String,
regions: Iterable[ReferenceRegion] = Iterable.empty): NucleotideContigFragmentRDD = {

val partitionedBinSize = getPartitionedBinSize(pathName)
val contigs = loadParquetContigFragments(pathName)

val datasetBoundNucleotideContigFragmentRDD = if (regions.nonEmpty) {
DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences, Some(partitionedBinSize))
.filterByOverlappingRegions(regions)
} else {
DatasetBoundNucleotideContigFragmentRDD(contigs.dataset, contigs.sequences, Some(partitionedBinSize))
}

datasetBoundNucleotideContigFragmentRDD
}

/**
* Load a path name in Parquet + Avro format into a FragmentRDD.
*
Expand Down Expand Up @@ -3051,4 +3176,27 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
loadParquetFragments(pathName, optPredicate = optPredicate, optProjection = optProjection)
}
}

/**
* Return true if the specified path of Parquet + Avro files is partitioned.
*
* @param pathName Path in which to look for partitioned flag.
* @return Return true if the specified path of Parquet + Avro files is partitioned.
*
* If a glob is used, all directories within the blog must be partitioned, and must have been saved
* using the same partitioned bin size. Behavior is undefined if this requirement is not met.
*/
def getPartitionedBinSize(pathName: String): Int = {

val partitionSize = try {
val f = getFsAndFilesWithFilter(pathName, new FileFilter("_isPartitionedByStartPos")).head
f.getFileSystem(sc.hadoopConfiguration).open(f).readInt()
} catch {
case e: FileNotFoundException => {
throw new FileNotFoundException("Input Parquet files (%s) are not partitioned.".format(pathName))
}
}
partitionSize
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.bdgenomics.adam.rdd
import java.nio.file.Paths
import htsjdk.samtools.ValidationStringency
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{ FSDataOutputStream, FileSystem, Path }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{ Function => JFunction, Function2 }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ DataFrame, Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand All @@ -54,6 +55,7 @@ import scala.collection.JavaConversions._
import scala.math.min
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

private[rdd] class JavaSaveArgs(var outputPath: String,
var blockSize: Int = 128 * 1024 * 1024,
Expand Down Expand Up @@ -2324,6 +2326,19 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Geno
convFn.call(v, dsX)
})
}

/**
* Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions.
*
* @param querys ReferencesRegions to filter against
* @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a
* ReferenceRegion, defaults to 1
* @return Returns a new DatasetBoundGenomicRDD with ReferenceRegions filter applied.
*/
def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion],
optPartitionedLookBackNum: Option[Int] = Some(1)): V = {
filterByOverlappingRegions(querys)
}
}

trait GenomicRDDWithLineage[T, U <: GenomicRDDWithLineage[T, U]] extends GenomicRDD[T, U] {
Expand Down Expand Up @@ -2531,6 +2546,23 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
saveSequences(filePath)
}

protected def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000, partitionedLookBackNum: Int = 1): String = {

//test if this dataset is bound to Partitioned Parquet having field positionBin
if (Try(dataset("positionBin")).isSuccess) {
regions.map(r => "(contigName=" + "\'" + r.referenceName +
"\' and positionBin >= \'" + ((scala.math.floor(r.start / partitionSize).toInt) - partitionedLookBackNum) +
"\' and positionBin < \'" + (scala.math.floor(r.end / partitionSize).toInt + 1) +
"\' and (end > " + r.start + " and start < " + r.end + "))")
.mkString(" or ")
} else { // if no positionBin field is found then construct query without bin optimization
regions.map(r => "(contigName=" + "\'" + r.referenceName +
"\' and (end > " + r.start + " and start < " + r.end + "))")
.mkString(" or ")
}

}

/**
* Saves RDD as a directory of Parquet files.
*
Expand Down Expand Up @@ -2606,4 +2638,41 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A
def saveAsParquet(filePath: java.lang.String) {
saveAsParquet(new JavaSaveArgs(filePath))
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
*/
private def writePartitionedParquetFlag(filePath: String, partitionSize: Int): Unit = {
val path = new Path(filePath, "_isPartitionedByStartPos")
val fs: FileSystem = path.getFileSystem(rdd.context.hadoopConfiguration)
val f = fs.create(path)
f.writeInt(partitionSize)
f.close()
}

/**
* Saves this RDD to disk in range binned partitioned Parquet + Avro format
*
* @param filePath Path to save the file at.
* @param compressCodec Name of the compression codec to use.
* @param partitionSize size of partitions used when writing parquet, in base pairs. Defaults to 1000000.
*/
def saveAsPartitionedParquet(filePath: String,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
partitionSize: Int = 1000000) {
log.warn("Saving directly as Hive-partitioned Parquet from SQL. " +
"Options other than compression codec are ignored.")
val df = toDF()
df.withColumn("positionBin", floor(df("start") / partitionSize))
.write
.partitionBy("contigName", "positionBin")
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
writePartitionedParquetFlag(filePath, partitionSize)
saveMetadata(filePath)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ case class ParquetUnboundNucleotideContigFragmentRDD private[rdd] (

case class DatasetBoundNucleotideContigFragmentRDD private[rdd] (
dataset: Dataset[NucleotideContigFragmentProduct],
sequences: SequenceDictionary) extends NucleotideContigFragmentRDD
sequences: SequenceDictionary,
partitionedBinSize: Option[Int] = None) extends NucleotideContigFragmentRDD
with DatasetBoundGenomicDataset[NucleotideContigFragment, NucleotideContigFragmentProduct, NucleotideContigFragmentRDD] {

lazy val rdd: RDD[NucleotideContigFragment] = dataset.rdd.map(_.toAvro)

protected lazy val optPartitionMap = None
Expand All @@ -162,6 +162,24 @@ case class DatasetBoundNucleotideContigFragmentRDD private[rdd] (
newSequences: SequenceDictionary): NucleotideContigFragmentRDD = {
copy(sequences = newSequences)
}

override def transformDataset(tFn: Dataset[NucleotideContigFragmentProduct] => Dataset[NucleotideContigFragmentProduct]): NucleotideContigFragmentRDD = {
copy(dataset = tFn(dataset))
}

/**
* Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions.
*
* @param querys ReferencesRegions to filter against
* @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a
* ReferenceRegion, defaults to 1
* @return Returns a new DatasetBoundNucleotideContigFragmentRDD with ReferenceRegions filter applied.
*/
override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion],
optPartitionedLookBackNum: Option[Int] = Some(1)): NucleotideContigFragmentRDD = {
transformDataset((d: Dataset[org.bdgenomics.adam.sql.NucleotideContigFragment]) =>
d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get)))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ case class ParquetUnboundFeatureRDD private[rdd] (

case class DatasetBoundFeatureRDD private[rdd] (
dataset: Dataset[FeatureProduct],
sequences: SequenceDictionary) extends FeatureRDD
sequences: SequenceDictionary,
partitionedBinSize: Option[Int] = None) extends FeatureRDD
with DatasetBoundGenomicDataset[Feature, FeatureProduct, FeatureRDD] {

lazy val rdd = dataset.rdd.map(_.toAvro)
Expand Down Expand Up @@ -312,6 +313,22 @@ case class DatasetBoundFeatureRDD private[rdd] (
.withColumnRenamed("score", "count")
.as[Coverage], sequences)
}

/**
* Filters and replaces the underlying dataset based on overlap with any of a Seq of ReferenceRegions.
*
* @param querys ReferencesRegions to filter against
* @param optPartitionedLookBackNum Optional number of parquet position bins to look back to find start of a
* ReferenceRegion, defaults to 1
* @return Returns a new DatasetBoundFeatureRDD with ReferenceRegions filter applied.
*/
override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion],
optPartitionedLookBackNum: Option[Int] = Some(1)): FeatureRDD = {

transformDataset((d: Dataset[org.bdgenomics.adam.sql.Feature]) =>
d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get)))
}

}

case class RDDBoundFeatureRDD private[rdd] (
Expand Down
Loading

0 comments on commit 026b51f

Please sign in to comment.