Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try

import org.json4s.JsonDSL._

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
Expand Down Expand Up @@ -389,6 +389,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru

object StructType extends AbstractDataType {

/**
* A key used in field metadata to indicate that the field comes from the result of merging
* two different StructTypes that do not always contain the field. That is to say, the field
* might be missing (optional) from one of the StructTypes.
*/
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"

override private[sql] def defaultConcreteType: DataType = new StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,17 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.Serializable

import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[sql] object ParquetFilters {
case class SetInFilter[T <: Comparable[T]](
valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {

override def keep(value: T): Boolean = {
value != null && valueSet.contains(value)
}

override def canDrop(statistics: Statistics[T]): Boolean = false

override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
Expand Down Expand Up @@ -154,52 +144,32 @@ private[sql] object ParquetFilters {
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
case LongType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
case FloatType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
}

/**
* Returns a map from name of the column to the data type, if predicate push down applies
* (i.e. not an optional field).
*
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
* using such fields, otherwise Parquet library will throw exception. Here we filter out such
* fields.
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
*/
private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match {
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType)
case _ => Array.empty[(String, DataType)]
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema).toMap
val dataTypeOf = getFieldMap(schema)

// NOTE:
//
Expand Down Expand Up @@ -242,9 +212,6 @@ private[sql] object ParquetFilters {
case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))

case sources.In(name, valueSet) =>
makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))

case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,36 +514,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("SPARK-11164: test the parquet filter in") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)

// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
val df = spark.read.parquet(path).where("b in (0,2)")
assert(stripSparkFilter(df).count == 3)

val df1 = spark.read.parquet(path).where("not (b in (1))")
assert(stripSparkFilter(df1).count == 3)

val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 2)")
assert(stripSparkFilter(df2).count == 2)

val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 2)")
assert(stripSparkFilter(df3).count == 4)

val df4 = spark.read.parquet(path).where("not (a <= 2)")
assert(stripSparkFilter(df4).count == 3)
}
}
}
}

test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {
withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
// Here the schema becomes as below:
Expand Down