From d2f0dd55687329f2f9c348462956b628b7eed49f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 3 Mar 2019 22:20:31 -0800 Subject: [PATCH] [SPARK-26956][SS] remove streaming output mode from data source v2 APIs ## What changes were proposed in this pull request? Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic. The changes are: 1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data. 2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic. 3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait. The behavior changes: 1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it. 2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data. ## How was this patch tested? existing tests Closes #23859 from cloud-fan/update. Authored-by: Wenchen Fan Signed-off-by: gatorsmile --- .../sql/kafka010/KafkaSourceProvider.scala | 6 +-- .../writer/streaming/SupportsOutputMode.java | 29 ----------- .../datasources/noop/NoopDataSource.scala | 7 ++- .../streaming/MicroBatchExecution.scala | 10 +--- .../execution/streaming/StreamExecution.scala | 34 +++++++++++++ .../sql/execution/streaming/console.scala | 10 ++-- .../continuous/ContinuousExecution.scala | 10 +--- .../sources/ForeachWriterTable.scala | 11 ++-- .../streaming/sources/memoryV2.scala | 51 ++++++++----------- .../streaming/MemorySinkV2Suite.scala | 4 +- 10 files changed, 75 insertions(+), 97 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6994517b27d6a..01bb1536aa6c5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.sources.v2.writer.WriteBuilder -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -362,7 +362,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { + new WriteBuilder { private var inputSchema: StructType = _ override def withInputDataSchema(schema: StructType): WriteBuilder = { @@ -370,8 +370,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister this } - override def outputMode(mode: OutputMode): WriteBuilder = this - override def buildForStreaming(): StreamingWrite = { import scala.collection.JavaConverters._ diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java deleted file mode 100644 index 832dcfa145d1b..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer.streaming; - -import org.apache.spark.annotation.Unstable; -import org.apache.spark.sql.sources.v2.writer.WriteBuilder; -import org.apache.spark.sql.streaming.OutputMode; - -// TODO: remove it when we have `SupportsTruncate` -@Unstable -public interface SupportsOutputMode extends WriteBuilder { - - WriteBuilder outputMode(OutputMode mode); -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 8f2072c586a94..22a74e3ccaeee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -42,9 +41,9 @@ private[noop] object NoopTable extends Table with SupportsBatchWrite with Suppor } private[noop] object NoopWriteBuilder extends WriteBuilder - with SupportsSaveMode with SupportsOutputMode { + with SupportsSaveMode with SupportsTruncate { override def mode(mode: SaveMode): WriteBuilder = this - override def outputMode(mode: OutputMode): WriteBuilder = this + override def truncate(): WriteBuilder = this override def buildForBatch(): BatchWrite = NoopBatchWrite override def buildForStreaming(): StreamingWrite = NoopStreamingWrite } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index cca279030dfa7..de7cbe25ceb3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2} -import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -515,14 +514,7 @@ class MicroBatchExecution( val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: SupportsStreamingWrite => - // TODO: we should translate OutputMode to concrete write actions like truncate, but - // the truncate action is being developed in SPARK-26666. - val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava)) - .withQueryId(runId.toString) - .withInputDataSchema(newAttributePlan.schema) - val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode] - .outputMode(outputMode) - .buildForStreaming() + val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan) WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 90f7b477103ae..ea522ec90b0a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.{Condition, ReentrantLock} +import scala.collection.JavaConverters._ import scala.collection.mutable.{Map => MutableMap} import scala.util.control.NonFatal @@ -34,10 +35,14 @@ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} +import org.apache.spark.sql.sources.v2.writer.SupportsTruncate +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -532,6 +537,35 @@ abstract class StreamExecution( Option(name).map(_ + "
").getOrElse("") + s"id = $id
runId = $runId
batch = $batchDescription" } + + protected def createStreamingWrite( + table: SupportsStreamingWrite, + options: Map[String, String], + inputPlan: LogicalPlan): StreamingWrite = { + val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava)) + .withQueryId(runId.toString) + .withInputDataSchema(inputPlan.schema) + outputMode match { + case Append => + writeBuilder.buildForStreaming() + + case Complete => + // TODO: we should do this check earlier when we have capability API. + require(writeBuilder.isInstanceOf[SupportsTruncate], + table.name + " does not support Complete mode.") + writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() + + case Update => + // Although no v2 sinks really support Update mode now, but during tests we do want them + // to pretend to support Update mode, and treat Update mode same as Append mode. + if (Utils.isTesting) { + writeBuilder.buildForStreaming() + } else { + throw new IllegalArgumentException( + "Data source v2 streaming sinks does not support Update mode.") + } + } + } } object StreamExecution { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 348bc767b2c46..923bd749b29b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -21,9 +21,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.writer.WriteBuilder -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.types.StructType case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) @@ -64,7 +63,7 @@ object ConsoleTable extends Table with SupportsStreamingWrite { override def schema(): StructType = StructType(Nil) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { + new WriteBuilder with SupportsTruncate { private var inputSchema: StructType = _ override def withInputDataSchema(schema: StructType): WriteBuilder = { @@ -72,7 +71,8 @@ object ConsoleTable extends Table with SupportsStreamingWrite { this } - override def outputMode(mode: OutputMode): WriteBuilder = this + // Do nothing for truncate. Console sink is special that it just prints all the records. + override def truncate(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { assert(inputSchema != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 20101c7fda320..a1ac55ca4ce25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} -import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -175,14 +174,7 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - // TODO: we should translate OutputMode to concrete write actions like truncate, but - // the truncate action is being developed in SPARK-26666. - val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava)) - .withQueryId(runId.toString) - .withInputDataSchema(withNewSources.schema) - val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode] - .outputMode(outputMode) - .buildForStreaming() + val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources) val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources) reportTimeTaken("queryPlanning") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 6fbb59c43625a..c0ae44a128ca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table} -import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -46,7 +45,7 @@ case class ForeachWriterTable[T]( override def schema(): StructType = StructType(Nil) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { + new WriteBuilder with SupportsTruncate { private var inputSchema: StructType = _ override def withInputDataSchema(schema: StructType): WriteBuilder = { @@ -54,7 +53,9 @@ case class ForeachWriterTable[T]( this } - override def outputMode(mode: OutputMode): WriteBuilder = this + // Do nothing for truncate. Foreach sink is special that it just forwards all the records to + // ForeachWriter. + override def truncate(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { new StreamingWrite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 3fc2cbe0fde57..397c5ff0dcb6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -30,12 +30,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType /** @@ -49,12 +47,12 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi override def schema(): StructType = StructType(Nil) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - new WriteBuilder with SupportsOutputMode { - private var mode: OutputMode = _ + new WriteBuilder with SupportsTruncate { + private var needTruncate: Boolean = false private var inputSchema: StructType = _ - override def outputMode(mode: OutputMode): WriteBuilder = { - this.mode = mode + override def truncate(): WriteBuilder = { + this.needTruncate = true this } @@ -64,7 +62,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi } override def buildForStreaming(): StreamingWrite = { - new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema) + new MemoryStreamingWrite(MemorySinkV2.this, inputSchema, needTruncate) } } } @@ -101,27 +99,20 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi }.mkString("\n") } - def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { + def write(batchId: Long, needTruncate: Boolean, newRows: Array[Row]): Unit = { val notCommitted = synchronized { latestBatchId.isEmpty || batchId > latestBatchId.get } if (notCommitted) { logDebug(s"Committing batch $batchId to $this") - outputMode match { - case Append | Update => - val rows = AddedData(batchId, newRows) - synchronized { batches += rows } - - case Complete => - val rows = AddedData(batchId, newRows) - synchronized { - batches.clear() - batches += rows - } - - case _ => - throw new IllegalArgumentException( - s"Output mode $outputMode is not supported by MemorySinkV2") + val rows = AddedData(batchId, newRows) + if (needTruncate) { + synchronized { + batches.clear() + batches += rows + } + } else { + synchronized { batches += rows } } } else { logDebug(s"Skipping already committed batch: $batchId") @@ -139,18 +130,18 @@ case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} class MemoryStreamingWrite( - val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType) + val sink: MemorySinkV2, schema: StructType, needTruncate: Boolean) extends StreamingWrite { override def createStreamingWriterFactory: MemoryWriterFactory = { - MemoryWriterFactory(outputMode, schema) + MemoryWriterFactory(schema) } override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { val newRows = messages.flatMap { case message: MemoryWriterCommitMessage => message.data } - sink.write(epochId, outputMode, newRows) + sink.write(epochId, needTruncate, newRows) } override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { @@ -158,13 +149,13 @@ class MemoryStreamingWrite( } } -case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType) +case class MemoryWriterFactory(schema: StructType) extends DataWriterFactory with StreamingDataWriterFactory { override def createWriter( partitionId: Int, taskId: Long): DataWriter[InternalRow] = { - new MemoryDataWriter(partitionId, outputMode, schema) + new MemoryDataWriter(partitionId, schema) } override def createWriter( @@ -175,7 +166,7 @@ case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType) } } -class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType) +class MemoryDataWriter(partition: Int, schema: StructType) extends DataWriter[InternalRow] with Logging { private val data = mutable.Buffer[Row]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index e804377540517..a90acf85c0161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -29,7 +29,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("data writer") { val partition = 1234 val writer = new MemoryDataWriter( - partition, OutputMode.Append(), new StructType().add("i", "int")) + partition, new StructType().add("i", "int")) writer.write(InternalRow(1)) writer.write(InternalRow(2)) writer.write(InternalRow(44)) @@ -44,7 +44,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("streaming writer") { val sink = new MemorySinkV2 val write = new MemoryStreamingWrite( - sink, OutputMode.Append(), new StructType().add("i", "int")) + sink, new StructType().add("i", "int"), needTruncate = false) write.commit(0, Array( MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),