Skip to content

Commit

Permalink
[ADAM-1417] Removed unused Projection.apply method, add test for Filter.
Browse files Browse the repository at this point in the history
Resolves #1417.
  • Loading branch information
fnothaft authored and heuermh committed Jul 8, 2017
1 parent 8572fb7 commit 0e92cfe
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ object Projection {
private def createFilterPredicate(fieldNames: Set[String],
exclude: Boolean = false): Field => Boolean = {
val filterPred = (f: Field) => fieldNames.contains(f.name)
val includeOrExlcude = (contains: Boolean) => if (exclude) !contains else contains
filterPred.andThen(includeOrExlcude)
val includeOrExclude = (contains: Boolean) => if (exclude) !contains else contains
filterPred.andThen(includeOrExclude)
}

/**
Expand All @@ -60,25 +60,6 @@ object Projection {
Projection(false, includedFields: _*)
}

/**
* Creates a projection that includes a variable number of fields.
*
* @param includedFields Fields to include in the projection
* @return Returns the specified schema with the fields predicated out.
*
* @note The schema is inferred from the provided FieldValues. Undefined
* behavior may result if you provide FieldValues from multiple different
* FieldEnumerations.
*
* @throws IllegalArgumentException if there are no fields included in the
* projection.
*/
def apply(includedFields: Traversable[FieldValue]): Schema = {
require(includedFields.size > 0, "Can't project down to zero fields!")
val baseSchema = includedFields.head.schema
createProjection(baseSchema, includedFields.map(_.toString).toSet, false)
}

/**
* Creates a projection that either includes or excludes a variable number of
* fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,20 @@ class AlignmentRecordFieldSuite extends ADAMFunSuite {
assert(alignmentRecords.first.getContigName === "6")
assert(alignmentRecords.first.getStart === 29941260L)
}

sparkTest("filter contigName when reading parquet alignment records") {
val path = tmpFile("alignmentRecords.parquet")
val rdd = sc.parallelize(Seq(AlignmentRecord.newBuilder()
.setContigName("6")
.setStart(29941260L)
.build()))
rdd.saveAsParquet(TestSaveArgs(path))

val projection = Filter(contigName)

val alignmentRecords: RDD[AlignmentRecord] = sc.loadParquet(path, optProjection = Some(projection))
assert(alignmentRecords.count() === 1)
assert(alignmentRecords.first.getContigName == null)
assert(alignmentRecords.first.getStart === 29941260L)
}
}

0 comments on commit 0e92cfe

Please sign in to comment.