Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
protected val namespaces: util.Map[List[String], Map[String, String]] =
new ConcurrentHashMap[List[String], Map[String, String]]()

protected val tables: util.Map[Identifier, InMemoryTable] =
new ConcurrentHashMap[Identifier, InMemoryTable]()
protected val tables: util.Map[Identifier, Table] =
new ConcurrentHashMap[Identifier, Table]()

private val invalidatedTables: util.Set[Identifier] = ConcurrentHashMap.newKeySet()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read;

import org.apache.spark.annotation.Unstable;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.TableScan;

/**
* A trait that should be implemented by V1 DataSources that would like to leverage the DataSource
* V2 read code paths.
*
* This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and
* will be removed in a future Spark release.
*
* @since 3.0.0
*/
@Unstable
public interface V1Scan extends Scan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not push down filters and schema pruning down to this scan? We support these in the V1 APIs. Then you can avoid the pushed Filters tag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is the same with v1 write fallback API. The v1 write fallback API also relies on the v2 API to config the write. It's better to leverage the v2 infra as much as we can. e.g. we may improve the v2 pushdown to push more operators that v1 doesn't support.


/**
* Create an `BaseRelation` with `TableScan` that can scan data from DataSource v1 to RDD[Row].
*
* @since 3.0.0
*/
<T extends BaseRelation & TableScan> T toV1TableScan(SQLContext context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it kind of seems weird to me that we're introducing new APIs that use deprecated APIs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't marked SQLContext as deprecated yet.

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.sql.connector.write
package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.{Experimental, Unstable}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.annotation.Unstable;
import org.apache.spark.sql.sources.InsertableRelation;

/**
* A trait that should be implemented by V1 DataSources that would like to leverage the DataSource
Expand All @@ -32,10 +31,8 @@
*
* @since 3.0.0
*/
@Experimental
@Unstable
trait V1WriteBuilder extends WriteBuilder {

public interface V1WriteBuilder extends WriteBuilder {
/**
* Creates an InsertableRelation that allows appending a DataFrame to a
* a destination (using data source-specific parameters). The insert method will only be
Expand All @@ -44,11 +41,5 @@
*
* @since 3.0.0
*/
def buildForV1Write(): InsertableRelation

// These methods cannot be implemented by a V1WriteBuilder. The super class will throw
// an Unsupported OperationException
override final def buildForBatch(): BatchWrite = super.buildForBatch()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java interface can't have final


override final def buildForStreaming(): StreamingWrite = super.buildForStreaming()
InsertableRelation buildForV1Write();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -27,7 +26,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
import org.apache.spark.sql.internal.SQLConf

class SparkPlanner(
val sparkContext: SparkContext,
val session: SparkSession,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. Finally. :)

val conf: SQLConf,
val experimentalMethods: ExperimentalMethods)
extends SparkStrategies {
Expand All @@ -39,7 +38,7 @@ class SparkPlanner(
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
DataSourceV2Strategy ::
new DataSourceV2Strategy(session) ::
FileSourceStrategy ::
DataSourceStrategy(conf) ::
SpecialLimits ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

protected lazy val singleRowRdd = sparkContext.parallelize(Seq(InternalRow()), 1)
protected lazy val singleRowRdd = session.sparkContext.parallelize(Seq(InternalRow()), 1)

object InMemoryScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow] = {
if (relation.relation.needConversion) {
val converters = RowEncoder(StructType.fromAttributes(output))
rdd.mapPartitions { iterator =>
iterator.map(converters.toRow)
}
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
DataSourceStrategy.toCatalystRDD(relation.relation, output, rdd)
}

/**
Expand Down Expand Up @@ -624,4 +617,21 @@ object DataSourceStrategy {

(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
}

/**
* Convert RDD of Row into RDD of InternalRow with objects in catalyst types
*/
private[sql] def toCatalystRDD(
relation: BaseRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow] = {
if (relation.needConversion) {
val converters = RowEncoder(StructType.fromAttributes(output))
rdd.mapPartitions { iterator =>
iterator.map(converters.toRow)
}
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,69 @@ package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Strategy extends Strategy with PredicateHelper {
class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {

import DataSourceV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

private def withProjectAndFilter(
project: Seq[NamedExpression],
filters: Seq[Expression],
scan: LeafExecNode,
needsUnsafeConversion: Boolean): SparkPlan = {
val filterCondition = filters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)

if (withFilter.output != project || needsUnsafeConversion) {
ProjectExec(project, withFilter)
} else {
withFilter
}
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters,
relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
if (v1Relation.schema != scan.readSchema()) {
throw new IllegalArgumentException(
"The fallback v1 relation reports inconsistent schema:\n" +
"Schema of v2 scan: " + scan.readSchema() + "\n" +
"Schema of v1 relation: " + v1Relation.schema)
}
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val originalOutputNames = relation.table.schema().map(_.name)
val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf)
val dsScan = RowDataSourceScanExec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there is quite a bit of work done here in the strategy that should be in the exec node instead, like converting the RDD to a catalyst RDD. What about building a subclass of RowDataSourceScanExec that handles these concerns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a good idea, but this is what we do for ds v1 (see the v1 rule DataSourceStrategy). I'd like to avoid changing the existing design choice we made before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to clean this up before committing this change. I'm not so sure that this was a purposeful design choice in v1. If so, what was the rationale for putting this logic in the strategy?

Copy link
Contributor Author

@cloud-fan cloud-fan Dec 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried and it's not easy to refactor. The query plans in Spark are all case class and you can't create a sub-class of case class. And it's only a few lines of code that can be encapsulated, the part of getting the v1 scan should belong to the strategy.

If you have some ideas, please help and send a PR to my branch. I'm happy to see approaches that can make the code cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about an alternate constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala auxiliary constructors must call other constructors as the FIRST action, e.g.

def this(...) {
  do something
  this(...)
}

is not allowed.

We can create an object and add an apply method to encapsulate the 4 lines of code, but does it really matter? The 4 lines of code only appear once here.

output,
requiredColumnsIndex,
translated.toSet,
pushed.toSet,
unsafeRowRDD,
v1Relation,
tableIdentifier = None)
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil

case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val batchExec = BatchScanExec(relation.output, relation.scan)

val filterCondition = filters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, batchExec)).getOrElse(batchExec)

val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) {
ProjectExec(project, withFilter)
} else {
withFilter
}

withProjection :: Nil
withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil

case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object PushDownUtils extends PredicateHelper {
*/
def pushFilters(
scanBuilder: ScanBuilder,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
filters: Seq[Expression]): (Seq[sources.Filter], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
// A map from translated data source leaf node filters to original catalyst filter
Expand Down Expand Up @@ -62,11 +62,7 @@ object PushDownUtils extends PredicateHelper {
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(pushedFilters, untranslatableExprs ++ postScanFilters)
(r.pushedFilters(), untranslatableExprs ++ postScanFilters)

case _ => (Nil, filters)
}
Expand All @@ -75,7 +71,7 @@ object PushDownUtils extends PredicateHelper {
/**
* Applies column pruning to the data source, w.r.t. the references of the given expressions.
*
* @return the created `ScanConfig`(since column pruning is the last step of operator pushdown),
* @return the `Scan` instance (since column pruning is the last step of operator pushdown),
* and new output attributes after column pruning.
*/
def pruneColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpressi
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.{Scan, V1Scan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType

object V2ScanRelationPushDown extends Rule[LogicalPlan] {
import DataSourceV2Implicits._
Expand Down Expand Up @@ -54,7 +57,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
|Output: ${output.mkString(", ")}
""".stripMargin)

val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output)
val wrappedScan = scan match {
case v1: V1Scan =>
val translated = filters.flatMap(DataSourceStrategy.translateFilter)
V1ScanWrapper(v1, translated, pushedFilters)
case _ => scan
}

val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output)

val projectionOverSchema = ProjectionOverSchema(output.toStructType)
val projectionFunc = (expr: Expression) => expr transformDown {
Expand All @@ -77,3 +87,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
withProjection
}
}

// A wrapper for v1 scan to carry the translated filters and the handled ones. This is required by
// the physical v1 scan node.
case class V1ScanWrapper(
v1Scan: V1Scan,
translatedFilters: Seq[sources.Filter],
handledFilters: Seq[sources.Filter]) extends Scan {
override def readSchema(): StructType = v1Scan.readSchema()
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class IncrementalExecution(

// Modified planner with stateful operations.
override val planner: SparkPlanner = new SparkPlanner(
sparkSession.sparkContext,
sparkSession,
sparkSession.sessionState.conf,
sparkSession.sessionState.experimentalMethods) {
override def strategies: Seq[Strategy] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ abstract class BaseSessionStateBuilder(
* Note: this depends on the `conf` and `experimentalMethods` fields.
*/
protected def planner: SparkPlanner = {
new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
new SparkPlanner(session, conf, experimentalMethods) {
override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
Expand Down
Loading