diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6994517b27d6a..01bb1536aa6c5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.sources.v2.writer.WriteBuilder -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -362,7 +362,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { + new WriteBuilder { private var inputSchema: StructType = _ override def withInputDataSchema(schema: StructType): WriteBuilder = { @@ -370,8 +370,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister this } - override def outputMode(mode: OutputMode): WriteBuilder = this - override def buildForStreaming(): StreamingWrite = { import scala.collection.JavaConverters._ diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java deleted file mode 100644 index 832dcfa145d1b..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer.streaming; - -import org.apache.spark.annotation.Unstable; -import org.apache.spark.sql.sources.v2.writer.WriteBuilder; -import org.apache.spark.sql.streaming.OutputMode; - -// TODO: remove it when we have `SupportsTruncate` -@Unstable -public interface SupportsOutputMode extends WriteBuilder { - - WriteBuilder outputMode(OutputMode mode); -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 8f2072c586a94..22a74e3ccaeee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -42,9 +41,9 @@ private[noop] object NoopTable extends Table with SupportsBatchWrite with Suppor } private[noop] object NoopWriteBuilder extends WriteBuilder - with SupportsSaveMode with SupportsOutputMode { + with SupportsSaveMode with SupportsTruncate { override def mode(mode: SaveMode): WriteBuilder = this - override def outputMode(mode: OutputMode): WriteBuilder = this + override def truncate(): WriteBuilder = this override def buildForBatch(): BatchWrite = NoopBatchWrite override def buildForStreaming(): StreamingWrite = NoopStreamingWrite } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index cca279030dfa7..de7cbe25ceb3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2} -import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -515,14 +514,7 @@ class MicroBatchExecution( val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: SupportsStreamingWrite => - // TODO: we should translate OutputMode to concrete write actions like truncate, but - // the truncate action is being developed in SPARK-26666. - val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava)) - .withQueryId(runId.toString) - .withInputDataSchema(newAttributePlan.schema) - val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode] - .outputMode(outputMode) - .buildForStreaming() + val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan) WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 90f7b477103ae..ea522ec90b0a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.{Condition, ReentrantLock} +import scala.collection.JavaConverters._ import scala.collection.mutable.{Map => MutableMap} import scala.util.control.NonFatal @@ -34,10 +35,14 @@ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} +import org.apache.spark.sql.sources.v2.writer.SupportsTruncate +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -532,6 +537,35 @@ abstract class StreamExecution( Option(name).map(_ + "
").getOrElse("") + s"id = $id
runId = $runId
batch = $batchDescription" } + + protected def createStreamingWrite( + table: SupportsStreamingWrite, + options: Map[String, String], + inputPlan: LogicalPlan): StreamingWrite = { + val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava)) + .withQueryId(runId.toString) + .withInputDataSchema(inputPlan.schema) + outputMode match { + case Append => + writeBuilder.buildForStreaming() + + case Complete => + // TODO: we should do this check earlier when we have capability API. + require(writeBuilder.isInstanceOf[SupportsTruncate], + table.name + " does not support Complete mode.") + writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() + + case Update => + // Although no v2 sinks really support Update mode now, but during tests we do want them + // to pretend to support Update mode, and treat Update mode same as Append mode. + if (Utils.isTesting) { + writeBuilder.buildForStreaming() + } else { + throw new IllegalArgumentException( + "Data source v2 streaming sinks does not support Update mode.") + } + } + } } object StreamExecution { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 348bc767b2c46..923bd749b29b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -21,9 +21,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.writer.WriteBuilder -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.types.StructType case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) @@ -64,7 +63,7 @@ object ConsoleTable extends Table with SupportsStreamingWrite { override def schema(): StructType = StructType(Nil) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { + new WriteBuilder with SupportsTruncate { private var inputSchema: StructType = _ override def withInputDataSchema(schema: StructType): WriteBuilder = { @@ -72,7 +71,8 @@ object ConsoleTable extends Table with SupportsStreamingWrite { this } - override def outputMode(mode: OutputMode): WriteBuilder = this + // Do nothing for truncate. Console sink is special that it just prints all the records. + override def truncate(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { assert(inputSchema != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 20101c7fda320..a1ac55ca4ce25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} -import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -175,14 +174,7 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - // TODO: we should translate OutputMode to concrete write actions like truncate, but - // the truncate action is being developed in SPARK-26666. - val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava)) - .withQueryId(runId.toString) - .withInputDataSchema(withNewSources.schema) - val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode] - .outputMode(outputMode) - .buildForStreaming() + val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources) val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources) reportTimeTaken("queryPlanning") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 6fbb59c43625a..c0ae44a128ca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table} -import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -46,7 +45,7 @@ case class ForeachWriterTable[T]( override def schema(): StructType = StructType(Nil) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { + new WriteBuilder with SupportsTruncate { private var inputSchema: StructType = _ override def withInputDataSchema(schema: StructType): WriteBuilder = { @@ -54,7 +53,9 @@ case class ForeachWriterTable[T]( this } - override def outputMode(mode: OutputMode): WriteBuilder = this + // Do nothing for truncate. Foreach sink is special that it just forwards all the records to + // ForeachWriter. + override def truncate(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { new StreamingWrite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 3fc2cbe0fde57..397c5ff0dcb6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -30,12 +30,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -49,12 +47,12 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi override def schema(): StructType = StructType(Nil) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { - private var mode: OutputMode = _ + new WriteBuilder with SupportsTruncate { + private var needTruncate: Boolean = false private var inputSchema: StructType = _ - override def outputMode(mode: OutputMode): WriteBuilder = { - this.mode = mode + override def truncate(): WriteBuilder = { + this.needTruncate = true this } @@ -64,7 +62,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi } override def buildForStreaming(): StreamingWrite = { - new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema) + new MemoryStreamingWrite(MemorySinkV2.this, inputSchema, needTruncate) } } } @@ -101,27 +99,20 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi }.mkString("\n") } - def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { + def write(batchId: Long, needTruncate: Boolean, newRows: Array[Row]): Unit = { val notCommitted = synchronized { latestBatchId.isEmpty || batchId > latestBatchId.get } if (notCommitted) { logDebug(s"Committing batch $batchId to $this") - outputMode match { - case Append | Update => - val rows = AddedData(batchId, newRows) - synchronized { batches += rows } - - case Complete => - val rows = AddedData(batchId, newRows) - synchronized { - batches.clear() - batches += rows - } - - case _ => - throw new IllegalArgumentException( - s"Output mode $outputMode is not supported by MemorySinkV2") + val rows = AddedData(batchId, newRows) + if (needTruncate) { + synchronized { + batches.clear() + batches += rows + } + } else { + synchronized { batches += rows } } } else { logDebug(s"Skipping already committed batch: $batchId") @@ -139,18 +130,18 @@ case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} class MemoryStreamingWrite( - val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType) + val sink: MemorySinkV2, schema: StructType, needTruncate: Boolean) extends StreamingWrite { override def createStreamingWriterFactory: MemoryWriterFactory = { - MemoryWriterFactory(outputMode, schema) + MemoryWriterFactory(schema) } override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { val newRows = messages.flatMap { case message: MemoryWriterCommitMessage => message.data } - sink.write(epochId, outputMode, newRows) + sink.write(epochId, needTruncate, newRows) } override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { @@ -158,13 +149,13 @@ class MemoryStreamingWrite( } } -case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType) +case class MemoryWriterFactory(schema: StructType) extends DataWriterFactory with StreamingDataWriterFactory { override def createWriter( partitionId: Int, taskId: Long): DataWriter[InternalRow] = { - new MemoryDataWriter(partitionId, outputMode, schema) + new MemoryDataWriter(partitionId, schema) } override def createWriter( @@ -175,7 +166,7 @@ case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType) } } -class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType) +class MemoryDataWriter(partition: Int, schema: StructType) extends DataWriter[InternalRow] with Logging { private val data = mutable.Buffer[Row]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index e804377540517..a90acf85c0161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -29,7 +29,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("data writer") { val partition = 1234 val writer = new MemoryDataWriter( - partition, OutputMode.Append(), new StructType().add("i", "int")) + partition, new StructType().add("i", "int")) writer.write(InternalRow(1)) writer.write(InternalRow(2)) writer.write(InternalRow(44)) @@ -44,7 +44,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("streaming writer") { val sink = new MemorySinkV2 val write = new MemoryStreamingWrite( - sink, OutputMode.Append(), new StructType().add("i", "int")) + sink, new StructType().add("i", "int"), needTruncate = false) write.commit(0, Array( MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),