Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.annotation.{Evolving, Stable}
sealed abstract class Filter {
/**
* List of columns that are referenced by this filter.
* Note that, if a column contains `dots` in name, it will be quoted to avoid confusion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an assumption that these column names match with the original source names already?
Suddenly, I'm wondering if this is safe for all data sources like JDBC? Some DBMS like PostgreSQL is case-sensitive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet has some logics to handle what you described. I was wondering if it's safe for ORC as well.

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I was confused about the reviewing order. This will be a breaking change against Stable API - I noted in #27780 (comment) too. There are non-DBMS sources too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a behavior change or we just clarify it in the comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is a dot in the column name, the existing Filter will not quote it. As a result, to extend it to support nested column, we need to quote the dot in name to distinguish from the dot as a separator of nested column.

The behavior change is only for the column name that contains dot.

* @since 2.1.0
*/
def references: Array[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,10 +652,11 @@ object DataSourceStrategy {
*/
object PushableColumn {
def unapply(e: Expression): Option[String] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
def helper(e: Expression) = e match {
case a: Attribute => Some(a.name)
case _ => None
}
helper(e)
helper(e).map(quoteIfNeeded)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is a behavior change.

Filter#references is an Array[String] so there is no confusion if we won't quote. What's our plan to support nested fields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each element in Filter#references is a column that this Filter refers to. There can be multiple of them. As a result, quoting on column name that contains dot is still needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I misread it with the v2 reference.

So our plan is to use dot to separate nested fields in v1 reference. Is this a 3.1 feature or we plan to make it into 3.0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to make it into 3.0 as this lays a good foundation to support features such as pushdown on column containing dots and nested predicate pushdown. Also, I feel it’s a good time to have a small breaking change in 3.0 instead of 3.1. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed to support column name containing dot?

And a few thoughts to roll it out more smoothly:

  1. if we refer to a top-level column, don't quote it (so no breaking change)
  2. if we refer to a nested field, use a special encoding (like json array?)

If users use a v1 source with Spark 3.0, and Spark pushes down a filter with nested fields, then it would either fail or ignore it as it doesn't recognize the json array encoded column name.

However, if we use dot to separate field names, then v1 source can be broken if a filter is pushed down pointing to a top-level column containing dot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation-wise, I think it's easier to use dot to separate field names, both for Spark and source developers. I don't know how common it is to see top-level column names containing dot.

cc @brkyvz @rdblue @imback82

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support column names containing dots in ORC.

I feel special encoding makes the implementation really over complicated. I believe that currently, only ORC support column name containing dots, so the impact is not much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbtsai, I thought Parquet also supports the dots in column names at #27780.
Also, the problem is that there are multiple external sources. Dots can be used to express namespace-like annotations just like our SQL configurations.

JSON array might not be an option either because column names themselves can be JSON array:

scala> sql("""select 'a' as `["a", "'b"]`""")
res1: org.apache.spark.sql.DataFrame = [["a", "'b"]: string]

It's unlikely but it's possible breaking change.

The only option I can think of now is to have a SQL conf that lists datasources to support to backquote dots in column name (and don't backquote for nested column access).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon You are right. I'm just breaking #27780 into smaller PR so it's easier to review. Chat with @cloud-fan offline, and I'll close this one and work on the other bigger one to avoid the confusion. Let's discuss how we want to handle the API compatibility in the other PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dbtsai!

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType}
import org.apache.spark.sql.types.StructType

/**
* Methods that can be shared when upgrading the built-in Hive.
Expand All @@ -45,4 +47,11 @@ trait OrcFiltersBase {
case _: AtomicType => true
case _ => false
}

/**
* The key of the dataTypeMap will be quoted if it contains `dots`.
*/
protected[sql] def quotedDataTypeMap(schema: StructType): Map[String, DataType] = {
schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -55,7 +56,7 @@ class ParquetFilters(
// and it does not support to create filters for them.
val primitiveFields =
schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
f.getName -> ParquetField(f.getName,
quoteIfNeeded(f.getName) -> ParquetField(f.getName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Parquet is case-sensitive, could you check if we have a test case for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is code handling this, and I will check if there is a test.

ParquetSchemaType(f.getOriginalType,
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.orc.mapreduce.OrcInputFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
Expand Down Expand Up @@ -59,7 +60,7 @@ case class OrcScanBuilder(
// changed `hadoopConf` in executors.
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
}
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
}
filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT

class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
val attrInts = Seq(
'cint.int
'cint.int,
Symbol("c.int").int
).zip(Seq(
"cint"
"cint",
"`c.int`" // single level field that contains `dot` in name
))

val attrStrs = Seq(
'cstr.string
'cstr.string,
Symbol("c.str").string
).zip(Seq(
"cstr"
"cstr",
"`c.str`" // single level field that contains `dot` in name
))

test("translate simple expression") { attrInts.zip(attrStrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
val dataTypeMap = quotedDataTypeMap(schema)
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
Expand Down Expand Up @@ -222,48 +222,39 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
expression match {
case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end())
case EqualTo(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().equals(name, getType(name), castedValue).end())

case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end())
case EqualNullSafe(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().nullSafeEquals(name, getType(name), castedValue).end())

case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end())
case LessThan(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThan(name, getType(name), castedValue).end())

case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end())
case LessThanOrEqual(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThanEquals(name, getType(name), castedValue).end())

case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end())
case GreaterThan(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThanEquals(name, getType(name), castedValue).end())

case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end())
case GreaterThanOrEqual(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThan(name, getType(name), castedValue).end())

case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
case IsNull(name) if isSearchableType(dataTypeMap(name)) =>
Some(builder.startAnd().isNull(name, getType(name)).end())

case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
case IsNotNull(name) if isSearchableType(dataTypeMap(name)) =>
Some(builder.startNot().isNull(name, getType(name)).end())

case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
Some(builder.startAnd().in(quotedName, getType(attribute),
case In(name, values) if isSearchableType(dataTypeMap(name)) =>
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name)))
Some(builder.startAnd().in(name, getType(name),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())

case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable

import org.apache.spark.SparkException
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -65,7 +64,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
val dataTypeMap = quotedDataTypeMap(schema)
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
Expand Down Expand Up @@ -222,48 +221,39 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
expression match {
case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end())
case EqualTo(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().equals(name, getType(name), castedValue).end())

case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end())
case EqualNullSafe(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().nullSafeEquals(name, getType(name), castedValue).end())

case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end())
case LessThan(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThan(name, getType(name), castedValue).end())

case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end())
case LessThanOrEqual(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThanEquals(name, getType(name), castedValue).end())

case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end())
case GreaterThan(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThanEquals(name, getType(name), castedValue).end())

case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end())
case GreaterThanOrEqual(name, value) if isSearchableType(dataTypeMap(name)) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThan(name, getType(name), castedValue).end())

case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
case IsNull(name) if isSearchableType(dataTypeMap(name)) =>
Some(builder.startAnd().isNull(name, getType(name)).end())

case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
case IsNotNull(name) if isSearchableType(dataTypeMap(name)) =>
Some(builder.startNot().isNull(name, getType(name)).end())

case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
Some(builder.startAnd().in(quotedName, getType(attribute),
case In(name, values) if isSearchableType(dataTypeMap(name)) =>
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name)))
Some(builder.startAnd().in(name, getType(name),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())

case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => DatasourceOrcFilters}
import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree
import org.apache.spark.sql.hive.HiveUtils
Expand Down Expand Up @@ -73,7 +74,7 @@ private[orc] object OrcFilters extends Logging {
if (HiveUtils.isHive23) {
DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
} else {
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
Expand Down