Skip to content

Commit

Permalink
[SPARK-26811][SQL] Add capabilities to v2.Table
Browse files Browse the repository at this point in the history
This adds a new method, `capabilities` to `v2.Table` that returns a set of `TableCapability`. Capabilities are used to fail queries during analysis checks, `V2WriteSupportCheck`, when the table does not support operations, like truncation.

Existing tests for regressions, added new analysis suite, `V2WriteSupportCheckSuite`, for new capability checks.

Closes apache#24012 from rdblue/SPARK-26811-add-capabilities.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
rdblue authored and mccheah committed May 15, 2019
1 parent 1609b3f commit 8f9c5ac
Show file tree
Hide file tree
Showing 32 changed files with 417 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.util.{Locale, UUID}
import java.util.{Collections, Locale, UUID}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -358,6 +358,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan for batch,
* micro-batch, or continuous processing.
*/
interface SupportsRead extends Table {
public interface SupportsRead extends Table {

/**
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
* for batch or streaming.
*/
interface SupportsWrite extends Table {
public interface SupportsWrite extends Table {

/**
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.StructType;

import java.util.Set;

/**
* An interface representing a logical structured data set of a data source. For example, the
* implementation can be a directory on the file system, a topic of Kafka, or a table in the
* catalog, etc.
* <p>
* This interface can mixin the following interfaces to support different operations:
* </p>
* <ul>
* <li>{@link SupportsBatchRead}: this table can be read in batch queries.</li>
* </ul>
* This interface can mixin the following interfaces to support different operations, like
* {@code SupportsRead}.
*/
@Evolving
public interface Table {
Expand All @@ -45,4 +44,9 @@ public interface Table {
* empty schema can be returned here.
*/
StructType schema();

/**
* Returns the set of capabilities for this table.
*/
Set<TableCapability> capabilities();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Experimental;

/**
* Capabilities that can be provided by a {@link Table} implementation.
* <p>
* Tables use {@link Table#capabilities()} to return a set of capabilities. Each capability signals
* to Spark that the table supports a feature identified by the capability. For example, returning
* {@code BATCH_READ} allows Spark to read from the table using a batch scan.
*/
@Experimental
public enum TableCapability {
/**
* Signals that the table supports reads in batch execution mode.
*/
BATCH_READ,

/**
* Signals that the table supports append writes in batch execution mode.
* <p>
* Tables that return this capability must support appending data and may also support additional
* write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
* {@link #OVERWRITE_DYNAMIC}.
*/
BATCH_WRITE,

/**
* Signals that the table can be truncated in a write operation.
* <p>
* Truncating a table removes all existing rows.
* <p>
* See {@link org.apache.spark.sql.sources.v2.writer.SupportsTruncate}.
*/
TRUNCATE,

/**
* Signals that the table can replace existing data that matches a filter with appended data in
* a write operation.
* <p>
* See {@link org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}.
*/
OVERWRITE_BY_FILTER,

/**
* Signals that the table can dynamically replace existing data partitions with appended data in
* a write operation.
* <p>
* See {@link org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
*/
OVERWRITE_DYNAMIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsBatchRead;
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
Expand All @@ -33,8 +32,8 @@
* This logical representation is shared between batch scan, micro-batch streaming scan and
* continuous streaming scan. Data sources must implement the corresponding methods in this
* interface, to match what the table promises to support. For example, {@link #toBatch()} must be
* implemented, if the {@link Table} that creates this {@link Scan} implements
* {@link SupportsBatchRead}.
* implemented, if the {@link Table} that creates this {@link Scan} returns BATCH_READ support in
* its {@link Table#capabilities()}.
* </p>
*/
@Evolving
Expand Down Expand Up @@ -62,7 +61,7 @@ default String description() {
/**
* Returns the physical representation of this scan for batch query. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this scan implements {@link SupportsBatchRead}.
* {@link Table} that creates this returns batch read support in its {@link Table#capabilities()}.
*
* @throws UnsupportedOperationException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -58,7 +57,8 @@ default WriteBuilder withInputDataSchema(StructType schema) {
/**
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this scan implements {@link SupportsBatchWrite}.
* {@link Table} that creates this write returns BATCH_WRITE support in its
* {@link Table#capabilities()}.
*
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
* to indicate that no writing is needed. We can clean it up after removing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -221,8 +222,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
table match {
case _: SupportsBatchRead =>
case _: SupportsRead if table.supports(BATCH_READ) =>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, dsOptions))

case _ => loadV1Source(paths: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -265,16 +266,18 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val checkFilesExistsOption = "check_files_exist" -> "false"
val options = sessionOptions ++ extraOptions + checkFilesExistsOption
val dsOptions = new CaseInsensitiveStringMap(options.asJava)

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match {
case table: SupportsBatchWrite =>
case table: SupportsWrite if table.supports(BATCH_WRITE) =>
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
mode match {
case SaveMode.Append =>
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}

case SaveMode.Overwrite =>
case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) =>
// truncate the table
runCommand(df.sparkSession, "save") {
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.execution.datasources.noop

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
Expand All @@ -35,10 +39,11 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}

private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite {
private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
}

private[noop] object NoopWriteBuilder extends WriteBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}

object DataSourceV2Implicits {
implicit class TableHelper(table: Table) {
def asBatchReadable: SupportsBatchRead = {
def asReadable: SupportsRead = {
table match {
case support: SupportsBatchRead =>
case support: SupportsRead =>
support
case _ =>
throw new AnalysisException(s"Table does not support batch reads: ${table.name}")
throw new AnalysisException(s"Table does not support reads: ${table.name}")
}
}

def asBatchWritable: SupportsBatchWrite = {
def asWritable: SupportsWrite = {
table match {
case support: SupportsBatchWrite =>
case support: SupportsWrite =>
support
case _ =>
throw new AnalysisException(s"Table does not support batch writes: ${table.name}")
throw new AnalysisException(s"Table does not support writes: ${table.name}")
}
}

def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)

def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class DataSourceV2Relation(
}

def newScanBuilder(): ScanBuilder = {
table.asBatchReadable.newScanBuilder(options)
table.asReadable.newScanBuilder(options)
}

override def computeStats(): Statistics = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

case AppendData(r: DataSourceV2Relation, query, _) =>
AppendDataExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
Expand All @@ -157,10 +157,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}.toArray

OverwriteByExpressionExec(
r.table.asBatchWritable, filters, r.options, planLater(query)) :: Nil
r.table.asWritable, filters, r.options, planLater(query)) :: Nil

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
OverwritePartitionsDynamicExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil

case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
Expand Down
Loading

0 comments on commit 8f9c5ac

Please sign in to comment.