Skip to content

Commit

Permalink
[SPARK-26956][SS] remove streaming output mode from data source v2 APIs
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic.

The changes are:
1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data.
2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic.
3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait.

The behavior changes:
1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.

## How was this patch tested?

existing tests

Closes apache#23859 from cloud-fan/update.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
cloud-fan authored and mccheah committed May 15, 2019
1 parent caa5fab commit d2f0dd5
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -362,16 +362,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
new WriteBuilder {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
import scala.collection.JavaConverters._

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -42,9 +41,9 @@ private[noop] object NoopTable extends Table with SupportsBatchWrite with Suppor
}

private[noop] object NoopWriteBuilder extends WriteBuilder
with SupportsSaveMode with SupportsOutputMode {
with SupportsSaveMode with SupportsTruncate {
override def mode(mode: SaveMode): WriteBuilder = this
override def outputMode(mode: OutputMode): WriteBuilder = this
override def truncate(): WriteBuilder = this
override def buildForBatch(): BatchWrite = NoopBatchWrite
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock

Expand Down Expand Up @@ -515,14 +514,7 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: SupportsStreamingWrite =>
// TODO: we should translate OutputMode to concrete write actions like truncate, but
// the truncate action is being developed in SPARK-26666.
val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
.withQueryId(runId.toString)
.withInputDataSchema(newAttributePlan.schema)
val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
.outputMode(outputMode)
.buildForStreaming()
val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.{Condition, ReentrantLock}

import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import scala.util.control.NonFatal

Expand All @@ -34,10 +35,14 @@ import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

Expand Down Expand Up @@ -532,6 +537,35 @@ abstract class StreamExecution(
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
}

protected def createStreamingWrite(
table: SupportsStreamingWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
.withQueryId(runId.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
case Append =>
writeBuilder.buildForStreaming()

case Complete =>
// TODO: we should do this check earlier when we have capability API.
require(writeBuilder.isInstanceOf[SupportsTruncate],
table.name + " does not support Complete mode.")
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()

case Update =>
// Although no v2 sinks really support Update mode now, but during tests we do want them
// to pretend to support Update mode, and treat Update mode same as Append mode.
if (Utils.isTesting) {
writeBuilder.buildForStreaming()
} else {
throw new IllegalArgumentException(
"Data source v2 streaming sinks does not support Update mode.")
}
}
}
}

object StreamExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.types.StructType

case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
Expand Down Expand Up @@ -64,15 +63,16 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
override def schema(): StructType = StructType(Nil)

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this
// Do nothing for truncate. Console sink is special that it just prints all the records.
override def truncate(): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
assert(inputSchema != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock

Expand Down Expand Up @@ -175,14 +174,7 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}

// TODO: we should translate OutputMode to concrete write actions like truncate, but
// the truncate action is being developed in SPARK-26666.
val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
.withQueryId(runId.toString)
.withInputDataSchema(withNewSources.schema)
val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
.outputMode(outputMode)
.buildForStreaming()
val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)

reportTimeTaken("queryPlanning") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -46,15 +45,17 @@ case class ForeachWriterTable[T](
override def schema(): StructType = StructType(Nil)

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this
// Do nothing for truncate. Foreach sink is special that it just forwards all the records to
// ForeachWriter.
override def truncate(): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
new StreamingWrite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -49,12 +47,12 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
override def schema(): StructType = StructType(Nil)

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
private var mode: OutputMode = _
new WriteBuilder with SupportsTruncate {
private var needTruncate: Boolean = false
private var inputSchema: StructType = _

override def outputMode(mode: OutputMode): WriteBuilder = {
this.mode = mode
override def truncate(): WriteBuilder = {
this.needTruncate = true
this
}

Expand All @@ -64,7 +62,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
}

override def buildForStreaming(): StreamingWrite = {
new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema)
new MemoryStreamingWrite(MemorySinkV2.this, inputSchema, needTruncate)
}
}
}
Expand Down Expand Up @@ -101,27 +99,20 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
}.mkString("\n")
}

def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = {
def write(batchId: Long, needTruncate: Boolean, newRows: Array[Row]): Unit = {
val notCommitted = synchronized {
latestBatchId.isEmpty || batchId > latestBatchId.get
}
if (notCommitted) {
logDebug(s"Committing batch $batchId to $this")
outputMode match {
case Append | Update =>
val rows = AddedData(batchId, newRows)
synchronized { batches += rows }

case Complete =>
val rows = AddedData(batchId, newRows)
synchronized {
batches.clear()
batches += rows
}

case _ =>
throw new IllegalArgumentException(
s"Output mode $outputMode is not supported by MemorySinkV2")
val rows = AddedData(batchId, newRows)
if (needTruncate) {
synchronized {
batches.clear()
batches += rows
}
} else {
synchronized { batches += rows }
}
} else {
logDebug(s"Skipping already committed batch: $batchId")
Expand All @@ -139,32 +130,32 @@ case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
extends WriterCommitMessage {}

class MemoryStreamingWrite(
val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
val sink: MemorySinkV2, schema: StructType, needTruncate: Boolean)
extends StreamingWrite {

override def createStreamingWriterFactory: MemoryWriterFactory = {
MemoryWriterFactory(outputMode, schema)
MemoryWriterFactory(schema)
}

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
val newRows = messages.flatMap {
case message: MemoryWriterCommitMessage => message.data
}
sink.write(epochId, outputMode, newRows)
sink.write(epochId, needTruncate, newRows)
}

override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
// Don't accept any of the new input.
}
}

case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
case class MemoryWriterFactory(schema: StructType)
extends DataWriterFactory with StreamingDataWriterFactory {

override def createWriter(
partitionId: Int,
taskId: Long): DataWriter[InternalRow] = {
new MemoryDataWriter(partitionId, outputMode, schema)
new MemoryDataWriter(partitionId, schema)
}

override def createWriter(
Expand All @@ -175,7 +166,7 @@ case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
}
}

class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)
class MemoryDataWriter(partition: Int, schema: StructType)
extends DataWriter[InternalRow] with Logging {

private val data = mutable.Buffer[Row]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
test("data writer") {
val partition = 1234
val writer = new MemoryDataWriter(
partition, OutputMode.Append(), new StructType().add("i", "int"))
partition, new StructType().add("i", "int"))
writer.write(InternalRow(1))
writer.write(InternalRow(2))
writer.write(InternalRow(44))
Expand All @@ -44,7 +44,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
test("streaming writer") {
val sink = new MemorySinkV2
val write = new MemoryStreamingWrite(
sink, OutputMode.Append(), new StructType().add("i", "int"))
sink, new StructType().add("i", "int"), needTruncate = false)
write.commit(0,
Array(
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
Expand Down

0 comments on commit d2f0dd5

Please sign in to comment.