Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Predicate to filter conversion #234

Merged
merged 1 commit into from
May 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.bdgenomics.adam.avro.ADAMGenotype
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variation.ADAMVariationContext._
import org.apache.spark.SparkContext._
import org.bdgenomics.adam.predicates.GenotypeVarFilterPASSPredicate
import org.bdgenomics.adam.predicates.GenotypeRecordPASSPredicate
import org.bdgenomics.adam.projections.variation.ADAMGenotypeField
import org.bdgenomics.adam.rdd.variation.ConcordanceTable

Expand Down Expand Up @@ -57,7 +57,7 @@ class GenotypeConcordance(protected val args: GenotypeConcordanceArgs) extends A
val predicate = if (!args.includeNonPass) {
// We also need to project the filter field to use this predicate
// project :+= varIsFiltered
Some(classOf[GenotypeVarFilterPASSPredicate])
Some(classOf[GenotypeRecordPASSPredicate])
} else
None
val projection = None //Some(Projection(project))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.Job
import org.bdgenomics.adam.rdd.ADAMContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.avro.ADAMRecord
import org.bdgenomics.adam.predicates.LocusPredicate
import org.bdgenomics.adam.predicates.UniqueMappedReadPredicate

object MpileupCommand extends ADAMCommandCompanion {
val commandName: String = "mpileup"
Expand All @@ -47,7 +47,7 @@ class MpileupCommand(protected val args: MpileupArgs) extends ADAMSparkCommand[M

def run(sc: SparkContext, job: Job) {

val reads: RDD[ADAMRecord] = sc.adamLoad(args.readInput, Some(classOf[LocusPredicate]))
val reads: RDD[ADAMRecord] = sc.adamLoad(args.readInput, Some(classOf[UniqueMappedReadPredicate]))

val pileups = new PileupTraversable(reads)
for (pileup <- pileups) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PileupAggregator(protected val args: PileupAggregatorArgs)
val companion = PileupAggregator

def run(sc: SparkContext, job: Job) {
val pileups: RDD[ADAMPileup] = sc.adamLoad(args.readInput, predicate = Some(classOf[LocusPredicate]))
val pileups: RDD[ADAMPileup] = sc.adamLoad(args.readInput)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this was using LocusPredicate before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That predicate is necessary. Pileups are created from mapped reads only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put a comment to keep others from tripping up on this too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - but for this is after pileup creation right? Also the fields that LocusPredicate will check against are not defined for an ADAMPileup. I was going to substitute the MappedReadPredicate, but wasn't able to actually because of the typing as that is only applicable on ADAMRecord

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, Arun. This is after pileup creation so the predicate isn't needed.

pileups.adamAggregatePileups().adamSave(args.pileupOutput,
blockSize = args.blockSize, pageSize = args.pageSize,
compressCodec = args.compressionCodec, disableDictionaryEncoding = args.disableDictionary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import parquet.filter.UnboundRecordFilter
import org.apache.avro.specific.SpecificRecord
import org.bdgenomics.adam.avro.ADAMRecord
import org.bdgenomics.adam.predicates.ADAMPredicate

/**
* This set of classes executes a plugin along with the associated input location.
Expand Down Expand Up @@ -76,7 +77,7 @@ class PluginExecutor(protected val args: PluginExecutorArgs) extends ADAMSparkCo
}

def load[Input <% SpecificRecord: Manifest](sc: SparkContext, locations: String, projection: Option[Schema]): RDD[Input] = {
sc.adamLoad[Input, UnboundRecordFilter](locations, projection = projection)
sc.adamLoad[Input, ADAMPredicate[Input]](locations, projection = projection)
}

def output[Output](sc: SparkContext, output: RDD[Output]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package org.bdgenomics.adam.cli

import org.apache.hadoop.mapreduce.Job
import org.bdgenomics.adam.predicates.LocusPredicate
import org.bdgenomics.adam.predicates.{ UniqueMappedReadPredicate, LocusPredicate }
import org.kohsuke.args4j.{ Option => option, Argument }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.avro.{ ADAMPileup, ADAMRecord }
Expand Down Expand Up @@ -57,7 +57,7 @@ class Reads2Ref(protected val args: Reads2RefArgs) extends ADAMSparkCommand[Read
val companion = Reads2Ref

def run(sc: SparkContext, job: Job) {
val reads: RDD[ADAMRecord] = sc.adamLoad(args.readInput, Some(classOf[LocusPredicate]))
val reads: RDD[ADAMRecord] = sc.adamLoad(args.readInput, Some(classOf[UniqueMappedReadPredicate]))

val readCount = reads.count()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2014. Mount Sinai School of Medicine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.predicates

import org.bdgenomics.adam.avro.{ ADAMRecord, ADAMGenotype }

object ADAMGenotypeConditions {

val isPassing = RecordCondition[ADAMGenotype](FieldCondition("variantCallingAnnotations.variantIsPassing", PredicateUtils.isTrue))

def hasMinReadDepth(minReadDepth: Int) = RecordCondition[ADAMRecord](FieldCondition("variantCallingAnnotations.readDepth", (x: Int) => x > minReadDepth))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2014. Mount Sinai School of Medicine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.predicates

import parquet.filter.{ RecordFilter, UnboundRecordFilter }
import java.lang.Iterable
import parquet.column.ColumnReader
import org.apache.spark.rdd.RDD

/**
*
* ADAMPredicate: Classes derived from ADAMPredicate can be used to set ParquetInputFormat.setUnboundRecordFilter
* for predicate pushdown, or alternatively, filter an already loaded RDD
*
*/
trait ADAMPredicate[T] extends UnboundRecordFilter {
val recordCondition: RecordCondition[T]

final def apply(rdd: RDD[T]): RDD[T] = {
rdd.filter(recordCondition.filter)
}

override def bind(readers: Iterable[ColumnReader]): RecordFilter = recordCondition.recordFilter.bind(readers)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2014. Mount Sinai School of Medicine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.predicates

import org.bdgenomics.adam.projections.ADAMRecordField
import org.bdgenomics.adam.avro.ADAMRecord

object ADAMRecordConditions {

val isMapped = RecordCondition[ADAMRecord](FieldCondition(ADAMRecordField.readMapped.toString, PredicateUtils.isTrue))
val isUnique = RecordCondition[ADAMRecord](FieldCondition(ADAMRecordField.duplicateRead.toString, PredicateUtils.isFalse))

val isPrimaryAlignment = RecordCondition[ADAMRecord](FieldCondition(ADAMRecordField.primaryAlignment.toString, PredicateUtils.isTrue))

val passedVendorQualityChecks = RecordCondition[ADAMRecord](FieldCondition(ADAMRecordField.failedVendorQualityChecks.toString, PredicateUtils.isFalse))

def isHighQuality(minQuality: Int) = RecordCondition[ADAMRecord](FieldCondition(ADAMRecordField.mapq.toString, (x: Int) => x > minQuality))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2014. Mount Sinai School of Medicine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.predicates

import parquet.filter.ColumnPredicates.Predicate
import parquet.column.ColumnReader
import parquet.filter.UnboundRecordFilter
import parquet.filter.ColumnRecordFilter._
import org.bdgenomics.adam.predicates.ColumnReaderInput.ColumnReaderInput
import scala.Predef._

object ColumnReaderInput extends Serializable {
trait ColumnReaderInput[T] extends Serializable {
def convert(x: ColumnReader): T
}
implicit object ColumnReaderInputInt extends ColumnReaderInput[Int] {
def convert(input: ColumnReader): Int = input.getInteger
}
implicit object ColumnReaderInputString extends ColumnReaderInput[String] {
def convert(input: ColumnReader): String = input.getBinary.toStringUsingUTF8
}
implicit object ColumnReaderInputDouble extends ColumnReaderInput[Double] {
def convert(input: ColumnReader): Double = input.getDouble
}
implicit object ColumnReaderInputFloat extends ColumnReaderInput[Float] {
def convert(input: ColumnReader): Float = input.getFloat
}
implicit object ColumnReaderInputBoolean extends ColumnReaderInput[Boolean] {
def convert(input: ColumnReader): Boolean = input.getBoolean
}
}

private[predicates] case class FieldCondition[T](val fieldName: String, filter: T => Boolean)(implicit converter: ColumnReaderInput[T]) extends Predicate {

def apply(input: Any): Boolean = {
filter(input.asInstanceOf[T])
}

override def apply(input: ColumnReader): Boolean = {
filter(converter.convert(input))
}

def columnFilter: UnboundRecordFilter = column(fieldName, this)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@

package org.bdgenomics.adam.predicates

import parquet.filter.{ RecordFilter, UnboundRecordFilter }
import java.lang.Iterable
import parquet.column.ColumnReader
import parquet.filter.ColumnRecordFilter.column
import parquet.filter.OrRecordFilter.or
import parquet.filter.ColumnPredicates.equalTo
import org.bdgenomics.adam.avro.ADAMGenotype

class GenotypeRecordPASSPredicate extends ADAMPredicate[ADAMGenotype] {

override val recordCondition = RecordCondition[ADAMGenotype](FieldCondition("variantCallingAnnotations.variantIsPassing", PredicateUtils.isTrue))

class GenotypeVarFilterPASSPredicate extends UnboundRecordFilter {
def bind(readers: Iterable[ColumnReader]): RecordFilter = {
column("variantCallingAnnotations.variantIsPassing", equalTo(true)).bind(readers)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2014. Mount Sinai School of Medicine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.predicates

import org.bdgenomics.adam.avro.ADAMRecord

class HighQualityReadPredicate extends ADAMPredicate[ADAMRecord] {

override val recordCondition = ADAMRecordConditions.isMapped && ADAMRecordConditions.isHighQuality(30)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.bdgenomics.adam.predicates

object PredicateUtils {

val isTrue = (x: Boolean) => x == true
val isFalse = (x: Boolean) => x == false

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2014. Mount Sinai School of Medicine
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bdgenomics.adam.predicates

import org.apache.avro.specific.SpecificRecord
import org.apache.avro.Schema
import org.apache.avro.Schema.Field
import parquet.filter.{ AndRecordFilter, OrRecordFilter, UnboundRecordFilter }

import org.bdgenomics.adam.util.ImplicitJavaConversions._
import scala.annotation.tailrec

private[predicates] object RecordCondition {

// Convert predicate on single field to predicate on record
def getRecordPredicate[T <: SpecificRecord: Manifest, U](condition: FieldCondition[U]): T => Boolean = {
@tailrec
def getFieldValue(record: SpecificRecord, fieldPath: Seq[String]): Any = {
val schema = record.getSchema
val field: Field = schema.getField(fieldPath.head)
val fieldType = field.schema.getTypes.filter(_.getType != Schema.Type.NULL)(0)
if (fieldType.getType == Schema.Type.RECORD) {
getFieldValue(record.get(field.pos).asInstanceOf[SpecificRecord], fieldPath.tail)
} else {
record.get(field.pos)
}
}

(record: T) => {
val fieldName = condition.fieldName
val filter: Any => Boolean = condition.apply
val fieldValue = getFieldValue(record, fieldName.split("\\."))
filter(fieldValue)
}
}

// Create a record predicate from many individual field predicates
def apply[T <: SpecificRecord: Manifest](conditions: FieldCondition[_]*): RecordCondition[T] = {
conditions.map(c => {
val fieldPredicate = getRecordPredicate(c)
new RecordCondition(fieldPredicate, c.columnFilter)
}).reduce(_ && _)
}
}

/**
*
* A RecordCondition is a filter on any Avro defined records and
* contains an UnboundRecordFilter that can be used for predicate pushdown
* with Parquet stored files
*
*/
private[predicates] class RecordCondition[T <% SpecificRecord: Manifest] private (val filter: T => Boolean,
val recordFilter: UnboundRecordFilter) extends Serializable {

// Combine two predicates through an AND
def and(other: RecordCondition[T]): RecordCondition[T] = &&(other)
def &&(other: RecordCondition[T]): RecordCondition[T] = {

// Local variables to avoid serialization
val thisFilter = filter
val otherFilter = other.filter

new RecordCondition[T](filter = (r: T) => thisFilter(r) && otherFilter(r),
recordFilter = AndRecordFilter.and(recordFilter, other.recordFilter))
}

// Combine two predicats through an OR
def or(other: RecordCondition[T]): RecordCondition[T] = ||(other)
def ||(other: RecordCondition[T]): RecordCondition[T] = {

// Local variables to avoid serialization
val thisFilter = filter
val otherFilter = other.filter

new RecordCondition[T](filter = (r: T) => thisFilter(r) || otherFilter(r),
recordFilter = OrRecordFilter.or(recordFilter, other.recordFilter))
}

// Apply the predicate on a record
def apply(record: T): Boolean = {
filter(record)
}
}
Loading