Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,78 +22,61 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
import org.apache.spark.sql.types.StructType

/**
* A logical plan representing a data source v2 scan.
*
* @param source An instance of a [[DataSourceV2]] implementation.
* @param options The options for this scan. Used to create fresh [[DataSourceReader]].
* @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh
* [[DataSourceReader]].
* @param optimizedReader An optimized [[DataSourceReader]] which is produced by the optimizer rule
* [[PushDownOperatorsToDataSource]]. It is a temporary value and is excluded
* in the equality definition of this class. It is to avoid re-applying
* operators pushdown and re-creating [[DataSourceReader]] when we copy
* the relation during query plan transformation.
* @param pushedFilters The filters that are pushed down to the data source.
*/
case class DataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
options: Map[String, String],
projection: Seq[AttributeReference],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None)
userSpecifiedSchema: Option[StructType],
// TODO: This is a workaround for the issue that Spark puts statistics in the logical plans.
// A `DataSourceReader` must be created during logical phase to report statistics.
// In the future we should move statistics to the physical plans and create the
// `DataSourceReader` on the fly during planning.
optimizedReader: Option[DataSourceReader] = None,
pushedFilters: Seq[Expression] = Nil)
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {

import DataSourceV2Relation._

override def simpleString: String = "RelationV2 " + metadataString

override lazy val schema: StructType = reader.readSchema()
def createFreshReader: DataSourceReader = source.createReader(options, userSpecifiedSchema)

override lazy val output: Seq[AttributeReference] = {
// use the projection attributes to avoid assigning new ids. fields that are not projected
// will be assigned new ids, which is okay because they are not projected.
val attrMap = projection.map(a => a.name -> a).toMap
schema.map(f => attrMap.getOrElse(f.name,
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
}
def reader: DataSourceReader = optimizedReader.getOrElse(createFreshReader)

private lazy val v2Options: DataSourceOptions = makeV2Options(options)

// postScanFilters: filters that need to be evaluated after the scan.
// pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
// Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There can be overlap between postScanFilters and pushedFilters

lazy val (
reader: DataSourceReader,
postScanFilters: Seq[Expression],
pushedFilters: Seq[Expression]) = {
val newReader = userSpecifiedSchema match {
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
source.asReadSupport.createReader(v2Options)
}

DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)

val (postScanFilters, pushedFilters) = filters match {
case Some(filterSeq) =>
DataSourceV2Relation.pushFilters(newReader, filterSeq)
case _ =>
(Nil, Nil)
}
logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add logDebug in new code?

logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
override def simpleString: String = "RelationV2 " + metadataString

(newReader, postScanFilters, pushedFilters)
override def equals(other: Any): Boolean = other match {
case other: DataSourceV2Relation =>
output == other.output && source.getClass == other.source.getClass &&
options == other.options && userSpecifiedSchema == other.userSpecifiedSchema &&
pushedFilters == other.pushedFilters
case _ => false
}

override def doCanonicalize(): LogicalPlan = {
val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]

// override output with canonicalized output to avoid attempting to configure a reader
val canonicalOutput: Seq[AttributeReference] = this.output
.map(a => QueryPlan.normalizeExprId(a, projection))

new DataSourceV2Relation(c.source, c.options, c.projection) {
override lazy val output: Seq[AttributeReference] = canonicalOutput
}
override def hashCode(): Int = {
Seq(output, source.getClass, options, userSpecifiedSchema, pushedFilters).hashCode()
}

// `LogicalPlanStats` caches the computed statistics, so we are fine here even the
// `optimizedReader` is None. We won't create `DataSourceReader` many times.
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
Expand All @@ -102,9 +85,7 @@ case class DataSourceV2Relation(
}

override def newInstance(): DataSourceV2Relation = {
// projection is used to maintain id assignment.
// if projection is not set, use output so the copy is not equal to the original
copy(projection = projection.map(_.newInstance()))
copy(output = output.map(_.newInstance()))
}
}

Expand Down Expand Up @@ -150,111 +131,57 @@ case class StreamingDataSourceV2Relation(
}

object DataSourceV2Relation {

private implicit class SourceHelpers(source: DataSourceV2) {
def asReadSupport: ReadSupport = {
source match {
case support: ReadSupport =>
support
case _: ReadSupportWithSchema =>
// this method is only called if there is no user-supplied schema. if there is no
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
}

def asReadSupportWithSchema: ReadSupportWithSchema = {
source match {
case support: ReadSupportWithSchema =>
support
case _: ReadSupport =>
throw new AnalysisException(
s"Data source does not support user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
private def asReadSupport: ReadSupport = source match {
Copy link
Contributor Author

@cloud-fan cloud-fan May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm touching the code around here, I removed the out-most { } and fixed the indentation here. I can revert it if people think this is not worth.

case support: ReadSupport =>
support
case _: ReadSupportWithSchema =>
// this method is only called if there is no user-supplied schema. if there is no
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}

def name: String = {
source match {
case registered: DataSourceRegister =>
registered.shortName()
case _ =>
source.getClass.getSimpleName
}
private def asReadSupportWithSchema: ReadSupportWithSchema = source match {
case support: ReadSupportWithSchema =>
support
case _: ReadSupport =>
throw new AnalysisException(
s"Data source does not support user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
}

private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
new DataSourceOptions(options.asJava)
}

private def schema(
source: DataSourceV2,
v2Options: DataSourceOptions,
userSchema: Option[StructType]): StructType = {
val reader = userSchema match {
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
private def name: String = source match {
case registered: DataSourceRegister =>
registered.shortName()
case _ =>
source.asReadSupport.createReader(v2Options)
source.getClass.getSimpleName
}

def createReader(
options: Map[String, String],
userSpecifiedSchema: Option[StructType]): DataSourceReader = {
val v2Options = new DataSourceOptions(options.asJava)
userSpecifiedSchema match {
case Some(s) =>
asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
asReadSupport.createReader(v2Options)
}
}
reader.readSchema()
}

def create(
source: DataSourceV2,
options: Map[String, String],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
DataSourceV2Relation(source, options, projection, filters, userSpecifiedSchema)
}

private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
reader match {
case projectionSupport: SupportsPushDownRequiredColumns =>
projectionSupport.pruneColumns(struct)
case _ =>
}
}

private def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
case r: SupportsPushDownCatalystFilters =>
val postScanFilters = r.pushCatalystFilters(filters.toArray)
val pushedFilters = r.pushedCatalystFilters()
(postScanFilters, pushedFilters)

case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
val translatedFilterToExpr = scala.collection.mutable.HashMap.empty[Filter, Expression]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = scala.collection.mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated = DataSourceStrategy.translateFilter(filterExpr)
if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
untranslatableExprs += filterExpr
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters =
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)

(untranslatableExprs ++ postScanFilters, pushedFilters)

case _ => (filters, Nil)
}
userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
val reader = source.createReader(options, userSpecifiedSchema)
DataSourceV2Relation(
reader.readSchema().toAttributes, source, options, userSpecifiedSchema)
}
}
Loading