Skip to content

Commit

Permalink
[ADAM-463] Added AvroMultiParquetRDD
Browse files Browse the repository at this point in the history
AvroMultiParquetRDD takes a FileLocator which indicates the _parent_
directory (in HDFS, or S3) of a set of parquet files, and which loads
_all_ row groups from all the parquet files that are immediate children
of that FileLocator.

(Here, "children" is defined w/r/t the childLocators method
implementation in the corresponding FileLocator.)

(Joint work with Carl Yeksigian.)
  • Loading branch information
tdanford committed Nov 14, 2014
1 parent b870b41 commit ed82e55
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,22 @@ package org.bdgenomics.adam.parquet_reimpl {
*/

/*
Step 1: find the relevant index entries
*/
Step 1: find the relevant index entries
*/
val entries: Iterable[IDRangeIndexEntry] = index.findIndexEntries(filter.indexPredicate)

/*
Step 2: get the parquet file metadata.
*/
Step 2: get the parquet file metadata.
*/
val parquetFiles: Map[String, ParquetFileMetadata] = entries.map(_.path).toSeq.distinct.map {
case parquetFilePath: String =>
val parquetLocator = dataRootLocator.relativeLocator(parquetFilePath)
parquetFilePath -> AvroParquetFileMetadata(parquetLocator, requestedSchema)
}.toMap

/*
Step 3: build the ParquetPartition values.
*/
Step 3: build the ParquetPartition values.
*/
entries.toArray.map {
case IDRangeIndexEntry(path, i, sample, range) => parquetFiles(path).partition(i)
}
Expand All @@ -106,6 +106,58 @@ package org.bdgenomics.adam.parquet_reimpl {
}
}

class AvroMultiParquetRDD[T <: IndexedRecord: ClassTag](@transient sc: SparkContext,
private val dataRootLocator: FileLocator,
private val filter: UnboundRecordFilter,
@transient private val requestedSchema: Option[Schema])

extends RDD[T](sc, Nil) {

def convertAvroSchema(schema: Option[Schema], fileMessageType: MessageType): MessageType =
schema match {
case None => fileMessageType
case Some(s) => new AvroSchemaConverter().convert(s)
}

override protected def getPartitions: Array[Partition] = {

val childLocators = dataRootLocator.childLocators()

val rgSchemaTypes = childLocators.flatMap {
case loc =>
val fileMetadata = ParquetCommon.readFileMetadata(loc.bytes)
val footer = new Footer(fileMetadata)
val fileMessageType = ParquetCommon.parseMessageType(fileMetadata)
val fileSchema = new ParquetSchemaType(fileMessageType)
val requestedMessage = convertAvroSchema(requestedSchema, fileMessageType)
val requested = new ParquetSchemaType(requestedMessage)

footer.rowGroups.map {
case rg => (rg, loc, requested, fileSchema)
}
}

rgSchemaTypes.zipWithIndex.map {
case (tuple, i) =>
tuple match {
case (rg, parquetFile, requested, fileSchema) =>
new ParquetPartition(parquetFile, i, rg, requested, fileSchema)
}
}.toArray
}

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val reqSchema = classTag[T].runtimeClass.newInstance().asInstanceOf[T].getSchema
val parquetPartition = split.asInstanceOf[ParquetPartition]
val byteAccess = parquetPartition.locator.bytes
def requestedMessageType = parquetPartition.requestedSchema.convertToParquet()

val avroRecordMaterializer = new UsableAvroRecordMaterializer[T](requestedMessageType, reqSchema)

parquetPartition.materializeRecords(byteAccess, avroRecordMaterializer, filter)
}
}

class AvroParquetRDD[T <: IndexedRecord: ClassTag](@transient sc: SparkContext,
private val filter: UnboundRecordFilter,
private val parquetFile: FileLocator,
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.bdgenomics.adam.parquet_reimpl.index._
import org.bdgenomics.adam.projections.Projection
import org.bdgenomics.adam.util._
import org.bdgenomics.formats.avro.{ AlignmentRecord, FlatGenotype }
import org.bdgenomics.services.ClasspathFileLocator
import parquet.column.ColumnReader
import parquet.filter.{ RecordFilter, UnboundRecordFilter }

Expand All @@ -47,6 +46,15 @@ class RDDFunSuite extends SparkFunSuite {
new LocalFileLocator(resourceFile(resourceName))
}

class AvroMultiParquetRDDSuite extends RDDFunSuite {

sparkTest("test load parquet_test directory loads all reads from two files") {
val locator = new ClasspathFileLocator("parquet_test")
val multiLoader = new AvroMultiParquetRDD[AlignmentRecord](sc, locator, null, None)
assert(multiLoader.count() === 400)
}
}

class AvroIndexedParquetRDDSuite extends RDDFunSuite {

def writeIndexAsBytes(rootLocator: FileLocator, parquets: String*): FileLocator = {
Expand Down

0 comments on commit ed82e55

Please sign in to comment.