From 65b1e7f34bb8c0b714f71799af764f0208b7f229 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 15 Aug 2020 16:26:19 -0700 Subject: [PATCH 1/7] Add basic. --- .../datasources/orc/OrcFileFormat.scala | 17 ++-- .../execution/datasources/orc/OrcUtils.scala | 14 +++ .../datasources/orc/OrcFilters.scala | 5 +- .../datasources/orc/OrcFilterSuite.scala | 86 ++++++++++++++++++- 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 4dff1ec7ebfb..51ca57e203f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -153,11 +153,6 @@ class OrcFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - if (sparkSession.sessionState.conf.orcFilterPushDown) { - OrcFilters.createFilter(dataSchema, filters).foreach { f => - OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames) - } - } val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) val sqlConf = sparkSession.sessionState.conf @@ -169,6 +164,8 @@ class OrcFileFormat val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles (file: PartitionedFile) => { val conf = broadcastedConf.value.value @@ -186,6 +183,16 @@ class OrcFileFormat if (resultedColPruneInfo.isEmpty) { Iterator.empty } else { + // ORC predicate pushdown + if (orcFilterPushDown) { + OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).map { fileSchema => + println(s"fileSchema: $fileSchema") + OrcFilters.createFilter(fileSchema, filters).foreach { f => + OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames) + } + } + } + val (requestedColIds, canPruneCols) = resultedColPruneInfo.get val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, dataSchema, resultSchema, partitionSchema, conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 072e670081d1..264cf8165e13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -92,6 +92,20 @@ object OrcUtils extends Logging { } } + def readCatalystSchema( + file: Path, + conf: Configuration, + ignoreCorruptFiles: Boolean): Option[StructType] = { + readSchema(file, conf, ignoreCorruptFiles) match { + case Some(schema) => + Some(CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]) + + case None => + // Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true. + None + } + } + /** * Reads ORC file schemas in multi-threaded manner, using native version of ORC. * This is visible for testing. diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 5273245fae45..0bcbaac0f9dc 100644 --- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -68,6 +68,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { * Create ORC filter as a SearchArgument instance. */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { + println(s"createFilter: $schema") val dataTypeMap = OrcFilters.getSearchableTypeMap(schema, SQLConf.get.caseSensitiveAnalysis) // Combines all convertible filters using `And` to produce a single conjunction val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters)) @@ -75,7 +76,9 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. // The input predicate is fully convertible. There should not be any empty result in the // following recursive method call `buildSearchArgument`. - buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() + val f = buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() + println(s"pushed: $f") + f } } diff --git a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 7df9f29b42f6..84e4c32ea378 100644 --- a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, Column, DataFrame} +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -514,5 +514,87 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { ).get.toString } } + + test("SPARK-25557: case sensitivity in predicate pushdown") { + withTempPath { dir => + val count = 10 + val tableName = "spark_25557" + val tableDir1 = dir.getAbsoluteFile + "/table1" + + // Physical ORC files have both `A` and `a` fields. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a") + .write.mode("overwrite").orc(tableDir1) + } + + // Metastore table has both `A` and `a` fields too. + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + sql( + s""" + |CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1' + """.stripMargin) + + checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1))) + + val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0")) + assert(actual1.count() == 1) + + val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0")) + assert(actual2.count() == 0) + } + + // Exception thrown for ambiguous case. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val e = intercept[AnalysisException] { + sql(s"select a from $tableName where a < 0").collect() + } + assert(e.getMessage.contains( + "Reference 'a' is ambiguous")) + } + } + + // Metastore table has only `A` field. + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql( + s""" + |CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1' + """.stripMargin) + + val e = intercept[SparkException] { + sql(s"select A from $tableName where A < 0").collect() + } + assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( + """Found duplicate field(s) "A": [A, a] in case-insensitive mode""")) + } + } + + // Physical ORC files have only `A` field. + val tableDir2 = dir.getAbsoluteFile + "/table2" + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(count).repartition(count).selectExpr("id - 1 as A") + .write.mode("overwrite").orc(tableDir2) + } + + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql( + s""" + |CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2' + """.stripMargin) + + checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1))) + + val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0")) + // TODO: ORC predicate pushdown should work under case-insensitive analysis. + sql(s"select a from $tableName where a < 0").show() + sql(s"select a from $tableName where a < 0").explain() + println(s"actual.count(): ${actual.count()}") + // assert(actual.count() == 1) + } + } + } + } } From e3949f16302366569cd0b8bc548560a603456aee Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 10 Aug 2020 15:16:44 -0700 Subject: [PATCH 2/7] Add Orc filter case sensitivity test. --- .../datasources/orc/OrcFiltersBase.scala | 10 ++- .../datasources/orc/OrcFilters.scala | 44 +++++++----- .../datasources/orc/OrcFilterSuite.scala | 66 ++++++++++++++++++ .../datasources/orc/OrcFilters.scala | 50 ++++++++------ .../datasources/orc/OrcFilterSuite.scala | 69 ++++++++++++++++++- 5 files changed, 194 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index b277b4da1cf8..ee0c08dd939a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -39,6 +39,8 @@ trait OrcFiltersBase { } } + case class OrcPrimitiveField(fieldName: String, fieldType: DataType) + /** * This method returns a map which contains ORC field name and data type. Each key * represents a column; `dots` are used as separators for nested columns. If any part @@ -49,19 +51,21 @@ trait OrcFiltersBase { */ protected[sql] def getSearchableTypeMap( schema: StructType, - caseSensitive: Boolean): Map[String, DataType] = { + caseSensitive: Boolean): Map[String, OrcPrimitiveField] = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper def getPrimitiveFields( fields: Seq[StructField], - parentFieldNames: Seq[String] = Seq.empty): Seq[(String, DataType)] = { + parentFieldNames: Seq[String] = Seq.empty): Seq[(String, OrcPrimitiveField)] = { fields.flatMap { f => f.dataType match { case st: StructType => getPrimitiveFields(st.fields, parentFieldNames :+ f.name) case BinaryType => None case _: AtomicType => - Some(((parentFieldNames :+ f.name).quoted, f.dataType)) + val fieldName = (parentFieldNames :+ f.name).quoted + val orcField = OrcPrimitiveField(fieldName, f.dataType) + Some((fieldName, orcField)) case _ => None } } diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index bc11bb8c1d5d..359e075bffa7 100644 --- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -81,7 +81,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { def convertibleFilters( schema: StructType, - dataTypeMap: Map[String, DataType], + dataTypeMap: Map[String, OrcPrimitiveField], filters: Seq[Filter]): Seq[Filter] = { import org.apache.spark.sql.sources._ @@ -179,7 +179,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { * @return the builder so far. */ private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + dataTypeMap: Map[String, OrcPrimitiveField], expression: Filter, builder: Builder): Builder = { import org.apache.spark.sql.sources._ @@ -215,7 +215,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { * @return the builder so far. */ private def buildLeafSearchArgument( - dataTypeMap: Map[String, DataType], + dataTypeMap: Map[String, OrcPrimitiveField], expression: Filter, builder: Builder): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = @@ -228,38 +228,44 @@ private[sql] object OrcFilters extends OrcFiltersBase { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). expression match { case EqualTo(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().equals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .equals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case EqualNullSafe(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().nullSafeEquals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .nullSafeEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case LessThan(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().lessThan(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case LessThanOrEqual(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().lessThanEquals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case GreaterThan(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startNot().lessThanEquals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startNot() + .lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startNot().lessThan(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startNot() + .lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case IsNull(name) if dataTypeMap.contains(name) => - Some(builder.startAnd().isNull(name, getType(name)).end()) + Some(builder.startAnd().isNull(dataTypeMap(name).fieldName, getType(name)).end()) case IsNotNull(name) if dataTypeMap.contains(name) => - Some(builder.startNot().isNull(name, getType(name)).end()) + Some(builder.startNot().isNull(dataTypeMap(name).fieldName, getType(name)).end()) case In(name, values) if dataTypeMap.contains(name) => - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name))) - Some(builder.startAnd().in(name, getType(name), + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name).fieldType)) + Some(builder.startAnd().in(dataTypeMap(name).fieldName, getType(name), castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) case _ => None diff --git a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 2643196cac56..2c296c833172 100644 --- a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, Column, DataFrame} @@ -513,5 +514,70 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { ).get.toString } } + + test("SPARK-25557: Case-insensitive field resolution for pushdown when reading ORC") { + import org.apache.spark.sql.sources._ + + def getOrcFilter( + schema: StructType, + filters: Seq[Filter], + caseSensitive: String): Option[SearchArgument] = { + var orcFilter: Option[SearchArgument] = None + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { + orcFilter = + OrcFilters.createFilter(schema, filters) + } + orcFilter + } + def testFilter( + schema: StructType, + filters: Seq[Filter], + expected: SearchArgument): Unit = { + val caseSensitiveFilters = getOrcFilter(schema, filters, "true") + val caseInsensitiveFilters = getOrcFilter(schema, filters, "false") + + assert(caseSensitiveFilters.isEmpty) + assert(caseInsensitiveFilters.isDefined) + + assert(caseInsensitiveFilters.get.getLeaves().size() > 0) + assert(caseInsensitiveFilters.get.getLeaves().size() == expected.getLeaves().size()) + (0 until expected.getLeaves().size()).foreach { index => + assert(caseInsensitiveFilters.get.getLeaves().get(index) == expected.getLeaves().get(index)) + } + } + + val schema1 = StructType(Seq(StructField("cint", IntegerType))) + testFilter(schema1, Seq(GreaterThan("CINT", 1)), + newBuilder.startNot() + .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema1, Seq( + And(GreaterThan("CINT", 1), EqualTo("Cint", 2))), + newBuilder.startAnd() + .startNot() + .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() + .equals("cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) + .`end`().build()) + + // Nested column case + val schema2 = StructType(Seq(StructField("a", + StructType(Seq(StructField("cint", IntegerType)))))) + + testFilter(schema2, Seq(GreaterThan("A.CINT", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq(GreaterThan("a.CINT", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq(GreaterThan("A.cint", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq( + And(GreaterThan("a.CINT", 1), EqualTo("a.Cint", 2))), + newBuilder.startAnd() + .startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() + .equals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) + .`end`().build()) + } } diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 0bcbaac0f9dc..9ad4f53cc608 100644 --- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -84,7 +84,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { def convertibleFilters( schema: StructType, - dataTypeMap: Map[String, DataType], + dataTypeMap: Map[String, OrcPrimitiveField], filters: Seq[Filter]): Seq[Filter] = { import org.apache.spark.sql.sources._ @@ -142,7 +142,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { /** * Get PredicateLeafType which is corresponding to the given DataType. */ - private def getPredicateLeafType(dataType: DataType) = dataType match { + private[sql] def getPredicateLeafType(dataType: DataType) = dataType match { case BooleanType => PredicateLeaf.Type.BOOLEAN case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG case FloatType | DoubleType => PredicateLeaf.Type.FLOAT @@ -182,7 +182,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { * @return the builder so far. */ private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + dataTypeMap: Map[String, OrcPrimitiveField], expression: Filter, builder: Builder): Builder = { import org.apache.spark.sql.sources._ @@ -218,11 +218,11 @@ private[sql] object OrcFilters extends OrcFiltersBase { * @return the builder so far. */ private def buildLeafSearchArgument( - dataTypeMap: Map[String, DataType], + dataTypeMap: Map[String, OrcPrimitiveField], expression: Filter, builder: Builder): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) + getPredicateLeafType(dataTypeMap(attribute).fieldType) import org.apache.spark.sql.sources._ @@ -231,38 +231,46 @@ private[sql] object OrcFilters extends OrcFiltersBase { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). expression match { case EqualTo(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().equals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .equals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case EqualNullSafe(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().nullSafeEquals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .nullSafeEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case LessThan(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().lessThan(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case LessThanOrEqual(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startAnd().lessThanEquals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startAnd() + .lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case GreaterThan(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startNot().lessThanEquals(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startNot() + .lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) => - val castedValue = castLiteralValue(value, dataTypeMap(name)) - Some(builder.startNot().lessThan(name, getType(name), castedValue).end()) + val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType) + Some(builder.startNot() + .lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end()) case IsNull(name) if dataTypeMap.contains(name) => - Some(builder.startAnd().isNull(name, getType(name)).end()) + Some(builder.startAnd() + .isNull(dataTypeMap(name).fieldName, getType(name)).end()) case IsNotNull(name) if dataTypeMap.contains(name) => - Some(builder.startNot().isNull(name, getType(name)).end()) + Some(builder.startNot() + .isNull(dataTypeMap(name).fieldName, getType(name)).end()) case In(name, values) if dataTypeMap.contains(name) => - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name))) - Some(builder.startAnd().in(name, getType(name), + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name).fieldType)) + Some(builder.startAnd().in(dataTypeMap(name).fieldName, getType(name), castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) case _ => None diff --git a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 84e4c32ea378..bc03280b0dc6 100644 --- a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row} @@ -590,11 +591,75 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { // TODO: ORC predicate pushdown should work under case-insensitive analysis. sql(s"select a from $tableName where a < 0").show() sql(s"select a from $tableName where a < 0").explain() - println(s"actual.count(): ${actual.count()}") - // assert(actual.count() == 1) + assert(actual.count() == 1) } } } } + + test("SPARK-25557: Case-insensitive field resolution for pushdown when reading ORC") { + import org.apache.spark.sql.sources._ + + def getOrcFilter( + schema: StructType, + filters: Seq[Filter], + caseSensitive: String): Option[SearchArgument] = { + var orcFilter: Option[SearchArgument] = None + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { + orcFilter = + OrcFilters.createFilter(schema, filters) + } + orcFilter + } + def testFilter( + schema: StructType, + filters: Seq[Filter], + expected: SearchArgument): Unit = { + val caseSensitiveFilters = getOrcFilter(schema, filters, "true") + val caseInsensitiveFilters = getOrcFilter(schema, filters, "false") + + assert(caseSensitiveFilters.isEmpty) + assert(caseInsensitiveFilters.isDefined) + + assert(caseInsensitiveFilters.get.getLeaves().size() > 0) + assert(caseInsensitiveFilters.get.getLeaves().size() == expected.getLeaves().size()) + (0 until expected.getLeaves().size()).foreach { index => + assert(caseInsensitiveFilters.get.getLeaves().get(index) == expected.getLeaves().get(index)) + } + } + + val schema1 = StructType(Seq(StructField("cint", IntegerType))) + testFilter(schema1, Seq(GreaterThan("CINT", 1)), + newBuilder.startNot() + .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema1, Seq( + And(GreaterThan("CINT", 1), EqualTo("Cint", 2))), + newBuilder.startAnd() + .startNot() + .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() + .equals("cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) + .`end`().build()) + + // Nested column case + val schema2 = StructType(Seq(StructField("a", + StructType(Seq(StructField("cint", IntegerType)))))) + + testFilter(schema2, Seq(GreaterThan("A.CINT", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq(GreaterThan("a.CINT", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq(GreaterThan("A.cint", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq( + And(GreaterThan("a.CINT", 1), EqualTo("a.Cint", 2))), + newBuilder.startAnd() + .startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() + .equals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) + .`end`().build()) + } } From a3d4bc804a59fd302aae44ac13b2e5b3519d9ca7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 15 Aug 2020 21:56:00 -0700 Subject: [PATCH 3/7] For DSv2. --- .../datasources/orc/OrcFileFormat.scala | 1 - .../v2/orc/OrcPartitionReaderFactory.scala | 20 +++++++++++++++++-- .../datasources/v2/orc/OrcScan.scala | 2 +- .../datasources/v2/orc/OrcScanBuilder.scala | 5 ----- .../datasources/orc/OrcFilters.scala | 5 +---- .../datasources/orc/OrcFilterSuite.scala | 2 -- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 51ca57e203f0..69badb4f7d59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -186,7 +186,6 @@ class OrcFileFormat // ORC predicate pushdown if (orcFilterPushDown) { OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).map { fileSchema => - println(s"fileSchema: $fileSchema") OrcFilters.createFilter(fileSchema, filters).foreach { f => OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 7f25f7bd135f..b19530e6476d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -31,9 +31,10 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcUtils} +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{AtomicType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -52,10 +53,13 @@ case class OrcPartitionReaderFactory( broadcastedConf: Broadcast[SerializableConfiguration], dataSchema: StructType, readDataSchema: StructType, - partitionSchema: StructType) extends FilePartitionReaderFactory { + partitionSchema: StructType, + filters: Array[Filter]) extends FilePartitionReaderFactory { private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields) private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val capacity = sqlConf.orcVectorizedReaderBatchSize + private val orcFilterPushDown = sqlConf.orcFilterPushDown + private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -63,6 +67,16 @@ case class OrcPartitionReaderFactory( resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) } + private def pushDownPredicates(filePath: Path, conf: Configuration): Unit = { + if (orcFilterPushDown) { + OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).map { fileSchema => + OrcFilters.createFilter(fileSchema, filters).foreach { f => + OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames) + } + } + } + } + override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value @@ -70,6 +84,8 @@ case class OrcPartitionReaderFactory( val filePath = new Path(new URI(file.filePath)) + pushDownPredicates(filePath, conf) + val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val resultedColPruneInfo = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 38b8ced51a14..1710abed57b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -48,7 +48,7 @@ case class OrcScan( // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema) + dataSchema, readDataSchema, readPartitionSchema, pushedFilters) } override def equals(obj: Any): Boolean = obj match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 0330dacffa58..2f9387532c25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -56,11 +56,6 @@ case class OrcScanBuilder( override def pushFilters(filters: Array[Filter]): Array[Filter] = { if (sparkSession.sessionState.conf.orcFilterPushDown) { - OrcFilters.createFilter(schema, filters).foreach { f => - // The pushed filters will be set in `hadoopConf`. After that, we can simply use the - // changed `hadoopConf` in executors. - OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames) - } val dataTypeMap = OrcFilters.getSearchableTypeMap(schema, SQLConf.get.caseSensitiveAnalysis) _pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray } diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 9ad4f53cc608..9acb04c8ca37 100644 --- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -68,7 +68,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { * Create ORC filter as a SearchArgument instance. */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { - println(s"createFilter: $schema") val dataTypeMap = OrcFilters.getSearchableTypeMap(schema, SQLConf.get.caseSensitiveAnalysis) // Combines all convertible filters using `And` to produce a single conjunction val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters)) @@ -76,9 +75,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. // The input predicate is fully convertible. There should not be any empty result in the // following recursive method call `buildSearchArgument`. - val f = buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() - println(s"pushed: $f") - f + buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() } } diff --git a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index bc03280b0dc6..e900bf17f73d 100644 --- a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -589,8 +589,6 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0")) // TODO: ORC predicate pushdown should work under case-insensitive analysis. - sql(s"select a from $tableName where a < 0").show() - sql(s"select a from $tableName where a < 0").explain() assert(actual.count() == 1) } } From 2f010de8f8530aa51b3568f5dd74c77dc2bb3af4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 17 Aug 2020 20:43:08 -0700 Subject: [PATCH 4/7] Make test consistent. --- .../datasources/orc/OrcFilterSuite.scala | 132 +++++++++--------- 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index c11eb7476be3..e159a0588dff 100644 --- a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -515,72 +515,6 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { } } - test("SPARK-32646: Case-insensitive field resolution for pushdown when reading ORC") { - import org.apache.spark.sql.sources._ - - def getOrcFilter( - schema: StructType, - filters: Seq[Filter], - caseSensitive: String): Option[SearchArgument] = { - var orcFilter: Option[SearchArgument] = None - withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { - orcFilter = - OrcFilters.createFilter(schema, filters) - } - orcFilter - } - - def testFilter( - schema: StructType, - filters: Seq[Filter], - expected: SearchArgument): Unit = { - val caseSensitiveFilters = getOrcFilter(schema, filters, "true") - val caseInsensitiveFilters = getOrcFilter(schema, filters, "false") - - assert(caseSensitiveFilters.isEmpty) - assert(caseInsensitiveFilters.isDefined) - - assert(caseInsensitiveFilters.get.getLeaves().size() > 0) - assert(caseInsensitiveFilters.get.getLeaves().size() == expected.getLeaves().size()) - (0 until expected.getLeaves().size()).foreach { index => - assert(caseInsensitiveFilters.get.getLeaves().get(index) == expected.getLeaves().get(index)) - } - } - - val schema1 = StructType(Seq(StructField("cint", IntegerType))) - testFilter(schema1, Seq(GreaterThan("CINT", 1)), - newBuilder.startNot() - .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) - testFilter(schema1, Seq( - And(GreaterThan("CINT", 1), EqualTo("Cint", 2))), - newBuilder.startAnd() - .startNot() - .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() - .equals("cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) - .`end`().build()) - - // Nested column case - val schema2 = StructType(Seq(StructField("a", - StructType(Seq(StructField("cint", IntegerType)))))) - - testFilter(schema2, Seq(GreaterThan("A.CINT", 1)), - newBuilder.startNot() - .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) - testFilter(schema2, Seq(GreaterThan("a.CINT", 1)), - newBuilder.startNot() - .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) - testFilter(schema2, Seq(GreaterThan("A.cint", 1)), - newBuilder.startNot() - .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) - testFilter(schema2, Seq( - And(GreaterThan("a.CINT", 1), EqualTo("a.Cint", 2))), - newBuilder.startAnd() - .startNot() - .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() - .equals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) - .`end`().build()) - } - test("SPARK-32622: case sensitivity in predicate pushdown") { withTempPath { dir => val count = 10 @@ -672,5 +606,71 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { } } } + + test("SPARK-32646: Case-insensitive field resolution for pushdown when reading ORC") { + import org.apache.spark.sql.sources._ + + def getOrcFilter( + schema: StructType, + filters: Seq[Filter], + caseSensitive: String): Option[SearchArgument] = { + var orcFilter: Option[SearchArgument] = None + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { + orcFilter = + OrcFilters.createFilter(schema, filters) + } + orcFilter + } + + def testFilter( + schema: StructType, + filters: Seq[Filter], + expected: SearchArgument): Unit = { + val caseSensitiveFilters = getOrcFilter(schema, filters, "true") + val caseInsensitiveFilters = getOrcFilter(schema, filters, "false") + + assert(caseSensitiveFilters.isEmpty) + assert(caseInsensitiveFilters.isDefined) + + assert(caseInsensitiveFilters.get.getLeaves().size() > 0) + assert(caseInsensitiveFilters.get.getLeaves().size() == expected.getLeaves().size()) + (0 until expected.getLeaves().size()).foreach { index => + assert(caseInsensitiveFilters.get.getLeaves().get(index) == expected.getLeaves().get(index)) + } + } + + val schema1 = StructType(Seq(StructField("cint", IntegerType))) + testFilter(schema1, Seq(GreaterThan("CINT", 1)), + newBuilder.startNot() + .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema1, Seq( + And(GreaterThan("CINT", 1), EqualTo("Cint", 2))), + newBuilder.startAnd() + .startNot() + .lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() + .equals("cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) + .`end`().build()) + + // Nested column case + val schema2 = StructType(Seq(StructField("a", + StructType(Seq(StructField("cint", IntegerType)))))) + + testFilter(schema2, Seq(GreaterThan("A.CINT", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq(GreaterThan("a.CINT", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq(GreaterThan("A.cint", 1)), + newBuilder.startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build()) + testFilter(schema2, Seq( + And(GreaterThan("a.CINT", 1), EqualTo("a.Cint", 2))), + newBuilder.startAnd() + .startNot() + .lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`() + .equals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 2L) + .`end`().build()) + } } From 090747d4a9d1540c4b65e45f960c926a23d76b84 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 17 Aug 2020 23:52:55 -0700 Subject: [PATCH 5/7] Push down predicate for ColumnarReader. --- .../datasources/v2/orc/OrcPartitionReaderFactory.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index b19530e6476d..1f38128e98fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -132,6 +132,8 @@ case class OrcPartitionReaderFactory( val filePath = new Path(new URI(file.filePath)) + pushDownPredicates(filePath, conf) + val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val resultedColPruneInfo = From 5807075d9befbeb91f8852f208c7a7fbb4679aa5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 24 Aug 2020 08:58:16 -0700 Subject: [PATCH 6/7] Fix hive 1.2. --- .../spark/sql/execution/datasources/orc/OrcFilters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 359e075bffa7..7eed502469bc 100644 --- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -139,7 +139,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { /** * Get PredicateLeafType which is corresponding to the given DataType. */ - private def getPredicateLeafType(dataType: DataType) = dataType match { + private[sql] def getPredicateLeafType(dataType: DataType) = dataType match { case BooleanType => PredicateLeaf.Type.BOOLEAN case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG case FloatType | DoubleType => PredicateLeaf.Type.FLOAT @@ -219,7 +219,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { expression: Filter, builder: Builder): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) + getPredicateLeafType(dataTypeMap(attribute).fieldType) import org.apache.spark.sql.sources._ From 50f00379ef043a81448b4fd6e424cf7f715ebf96 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 24 Aug 2020 20:21:43 -0700 Subject: [PATCH 7/7] Address comment. --- .../apache/spark/sql/execution/datasources/orc/OrcFilters.scala | 2 +- .../apache/spark/sql/execution/datasources/orc/OrcFilters.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 7eed502469bc..0e657bfe6623 100644 --- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -139,7 +139,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { /** * Get PredicateLeafType which is corresponding to the given DataType. */ - private[sql] def getPredicateLeafType(dataType: DataType) = dataType match { + def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match { case BooleanType => PredicateLeaf.Type.BOOLEAN case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG case FloatType | DoubleType => PredicateLeaf.Type.FLOAT diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 9acb04c8ca37..9511fc31f4ac 100644 --- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -139,7 +139,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { /** * Get PredicateLeafType which is corresponding to the given DataType. */ - private[sql] def getPredicateLeafType(dataType: DataType) = dataType match { + def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match { case BooleanType => PredicateLeaf.Type.BOOLEAN case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG case FloatType | DoubleType => PredicateLeaf.Type.FLOAT