Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report statistics to Spark.
*
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
* DataSourceReader. Implementations that return more accurate statistics based on projection and
* filters will not improve query performance until the planner can push operators before getting
* stats.
* Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader.
* Implementations that return more accurate statistics based on pushed operators will not improve
* query performance until the planner can push operators before getting stats.
*/
@InterfaceStability.Evolving
public interface SupportsReportStatistics extends DataSourceReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
import org.apache.spark.sql.types.StructType

/**
* A logical plan representing a data source v2 scan.
*
* @param source An instance of a [[DataSourceV2]] implementation.
* @param options The options for this scan. Used to create fresh [[DataSourceReader]].
* @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh
* [[DataSourceReader]].
*/
case class DataSourceV2Relation(
source: DataSourceV2,
output: Seq[AttributeReference],
options: Map[String, String],
userSpecifiedSchema: Option[StructType] = None)
userSpecifiedSchema: Option[StructType])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find a place that uses this default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because there are few places that create v2 relations so far, but when SQL statements and other paths that don't allow you to supply your own schema are added, I think this will be more common. It's okay to remove it, but I don't see much value in the change and I like to keep non-functional changes to a minimum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I agree we should minimum the non-functional changes, but removing dead code is also good to do. This is really a small change, if we do need the default value in the future, it's very easy to add it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, it's up to you.

extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {

import DataSourceV2Relation._
Expand All @@ -42,14 +49,7 @@ case class DataSourceV2Relation(

override def simpleString: String = "RelationV2 " + metadataString

lazy val v2Options: DataSourceOptions = makeV2Options(options)

def newReader: DataSourceReader = userSpecifiedSchema match {
case Some(userSchema) =>
source.asReadSupportWithSchema.createReader(userSchema, v2Options)
case None =>
source.asReadSupport.createReader(v2Options)
}
def newReader(): DataSourceReader = source.createReader(options, userSpecifiedSchema)

override def computeStats(): Statistics = newReader match {
case r: SupportsReportStatistics =>
Expand Down Expand Up @@ -139,83 +139,26 @@ object DataSourceV2Relation {
source.getClass.getSimpleName
}
}
}

private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
new DataSourceOptions(options.asJava)
}

private def schema(
source: DataSourceV2,
v2Options: DataSourceOptions,
userSchema: Option[StructType]): StructType = {
val reader = userSchema match {
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
source.asReadSupport.createReader(v2Options)
def createReader(
options: Map[String, String],
userSpecifiedSchema: Option[StructType]): DataSourceReader = {
val v2Options = new DataSourceOptions(options.asJava)
userSpecifiedSchema match {
case Some(s) =>
asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
asReadSupport.createReader(v2Options)
}
}
reader.readSchema()
}

def create(
source: DataSourceV2,
options: Map[String, String],
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val output = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
DataSourceV2Relation(source, output, options, userSpecifiedSchema)
}

def pushRequiredColumns(
relation: DataSourceV2Relation,
reader: DataSourceReader,
struct: StructType): Seq[AttributeReference] = {
reader match {
case projectionSupport: SupportsPushDownRequiredColumns =>
projectionSupport.pruneColumns(struct)
// return the output columns from the relation that were projected
val attrMap = relation.output.map(a => a.name -> a).toMap
projectionSupport.readSchema().map(f => attrMap(f.name))
case _ =>
relation.output
}
}

def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
case r: SupportsPushDownCatalystFilters =>
val postScanFilters = r.pushCatalystFilters(filters.toArray)
val pushedFilters = r.pushedCatalystFilters()
(postScanFilters, pushedFilters)

case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
val translatedFilterToExpr = scala.collection.mutable.HashMap.empty[Filter, Expression]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = scala.collection.mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated = DataSourceStrategy.translateFilter(filterExpr)
if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
untranslatableExprs += filterExpr
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters =
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)

(untranslatableExprs ++ postScanFilters, pushedFilters)

case _ => (filters, Nil)
}
userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
val reader = source.createReader(options, userSpecifiedSchema)
DataSourceV2Relation(
source, reader.readSchema().toAttributes, options, userSpecifiedSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,115 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.{execution, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
import scala.collection.mutable

import org.apache.spark.sql.{sources, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns}

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val projectSet = AttributeSet(project.flatMap(_.references))
val filterSet = AttributeSet(filters.flatMap(_.references))

val projection = if (filterSet.subsetOf(projectSet) &&
AttributeSet(relation.output) == projectSet) {
// When the required projection contains all of the filter columns and column pruning alone
// can produce the required projection, push the required projection.
// A final projection may still be needed if the data source produces a different column
// order or if it cannot prune all of the nested columns.
relation.output
} else {
// When there are filter columns not already in the required projection or when the required
// projection is more complicated than column pruning, base column pruning on the set of
// all columns needed by both.
(projectSet ++ filterSet).toSeq
}

val reader = relation.newReader
/**
* Pushes down filters to the data source reader
*
* @return pushed filter and post-scan filters.
*/
private def pushFilters(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for moving these functions. I considered it in the other commit, but decided to go with fewer changes. I like them here.

reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
case r: SupportsPushDownCatalystFilters =>
val postScanFilters = r.pushCatalystFilters(filters.toArray)
val pushedFilters = r.pushedCatalystFilters()
(pushedFilters, postScanFilters)

case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated = DataSourceStrategy.translateFilter(filterExpr)
if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
untranslatableExprs += filterExpr
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray)
.map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
(pushedFilters, untranslatableExprs ++ postScanFilters)

case _ => (Nil, filters)
}
}

val output = DataSourceV2Relation.pushRequiredColumns(relation, reader,
projection.asInstanceOf[Seq[AttributeReference]].toStructType)
/**
* Applies column pruning to the data source, w.r.t. the references of the given expressions.
*
* @return new output attributes after column pruning.
*/
// TODO: nested column pruning.
private def pruneColumns(
reader: DataSourceReader,
relation: DataSourceV2Relation,
exprs: Seq[Expression]): Seq[AttributeReference] = {
reader match {
case r: SupportsPushDownRequiredColumns =>
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
if (neededOutput != relation.output) {
r.pruneColumns(neededOutput.toStructType)
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
r.readSchema().toAttributes.map {
// We have to keep the attribute id during transformation.
a => a.withExprId(nameToAttr(a.name).exprId)
}
} else {
relation.output
}

case _ => relation.output
}
}

val (postScanFilters, pushedFilters) = DataSourceV2Relation.pushFilters(reader, filters)

logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val reader = relation.newReader()
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(reader, filters)
val output = pruneColumns(reader, relation, project ++ postScanFilters)
logInfo(
s"""
|Pushing operators to ${relation.source.getClass}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
""".stripMargin)

val scan = DataSourceV2ScanExec(
output, relation.source, relation.options, pushedFilters, reader)

val filter = postScanFilters.reduceLeftOption(And)
val withFilter = filter.map(execution.FilterExec(_, scan)).getOrElse(scan)
val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)

val withProjection = if (withFilter.output != project) {
execution.ProjectExec(project, withFilter)
ProjectExec(project, withFilter)
} else {
withFilter
}
Expand Down