diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index ab1ce347cbe3..f6221ccfffd2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -148,6 +148,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def createSink( + schema: StructType, sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f3b209deaae5..8a01b65a8c5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -260,10 +260,11 @@ case class DataSource( } /** Returns a sink that can be used to continually write data. */ - def createSink(outputMode: OutputMode): Sink = { + def createSink(schema: StructType, outputMode: OutputMode): Sink = { providingClass.newInstance() match { case s: StreamSinkProvider => - s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode) + s.createSink(schema, sparkSession.sqlContext, + caseInsensitiveOptions, partitionColumns, outputMode) case fileFormat: FileFormat => val path = caseInsensitiveOptions.getOrElse("path", { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index e8b9712d19cd..2c6c1722db49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType class ConsoleSink(options: Map[String, String]) extends Sink with Logging { // Number of rows to display, by default 20 rows @@ -53,6 +54,7 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging { class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister { def createSink( + schema: StructType, sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ff8b15b3ff3f..122055408ae3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -151,6 +151,7 @@ trait StreamSourceProvider { @InterfaceStability.Unstable trait StreamSinkProvider { def createSink( + schema: StructType, sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 0d2611f9bbcc..a108b3cb502e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -285,7 +285,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, - dataSource.createSink(outputMode), + dataSource.createSink(df.schema, outputMode), outputMode, useTempCheckpointLocation = useTempCheckpointLocation, recoverFromCheckpointLocation = recoverFromCheckpointLocation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index dc2506a48ad0..f56e96e728a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -96,6 +96,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { } override def createSink( + schema: StructType, spark: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala index 19ab2ff13e14..dd91ed4beaaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala @@ -57,6 +57,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider { } override def createSink( + schema: StructType, spark: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String],