diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index a1253dfe67e7..27561857c122 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -274,11 +274,13 @@ class InMemoryTable( this } - override def buildForBatch(): BatchWrite = writer + override def build(): Write = new Write { + override def toBatch: BatchWrite = writer - override def buildForStreaming(): StreamingWrite = streamingWriter match { - case exc: StreamingNotSupportedOperation => exc.throwsException() - case s => s + override def toStreaming: StreamingWrite = streamingWriter match { + case exc: StreamingNotSupportedOperation => exc.throwsException() + case s => s + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c9f40fa22bf9..67803ad76d5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -627,21 +627,22 @@ abstract class StreamExecution( inputPlan.schema, new CaseInsensitiveStringMap(options.asJava)) val writeBuilder = table.newWriteBuilder(info) - outputMode match { + val write = outputMode match { case Append => - writeBuilder.buildForStreaming() + writeBuilder.build() case Complete => // TODO: we should do this check earlier when we have capability API. require(writeBuilder.isInstanceOf[SupportsTruncate], table.name + " does not support Complete mode.") - writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() + writeBuilder.asInstanceOf[SupportsTruncate].truncate().build() case Update => require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend], table.name + " does not support Update mode.") - writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].buildForStreaming() + writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build() } + write.toStreaming } protected def purge(threshold: Long): Unit = {