diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 7c043acb499c..6bb4e1feb3a9 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -428,13 +428,15 @@ The Dataframe being written to Kafka should have the following columns in schema
string |
-\* The topic column is required if the "topic" configuration option is not specified.
+\* The topic column is required if the "topic" or "path" configuration option is not specified.
The value column is the only required option. If a key column is not specified then
a ```null``` valued key column will be automatically added (see Kafka semantics on
how ```null``` valued key values are handled). If a topic column exists then its value
-is used as the topic when writing the given row to Kafka, unless the "topic" configuration
-option is set i.e., the "topic" configuration option overrides the topic column.
+is used as the topic when writing the given row to Kafka, unless the "topic" or "path"
+configuration option is set i.e., the "topic" configuration option overrides both the
+"path" configuration option and the topic column; the "path" configuration option
+overrides the topic column.
The following options must be set for the Kafka sink
for both batch and streaming queries.
@@ -457,8 +459,17 @@ The following configurations are optional:
string |
none |
streaming and batch |
+ Sets the topic that all rows will be written to in Kafka. This option overrides
+ "path" option and any topic column that may exist in the data. |
+
+
+ | path |
+ string |
+ none |
+ streaming and batch |
Sets the topic that all rows will be written to in Kafka. This option overrides any
- topic column that may exist in the data. |
+ topic column that may exist in the data and is overridden by "topic" option.
+ However, if both "topic" and "path" options are specified they must match.
@@ -468,30 +479,50 @@ The following configurations are optional:
{% highlight scala %}
-// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
-val ds = df
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
+val q = df
+ .select($"key" cast "string", $"value" cast "string")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
-// Write key-value data from a DataFrame to Kafka using a topic specified in the data
-val ds = df
- .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+// Write key-value data from a DataFrame to Kafka using the topic specified in the data
+val q = df
+ .select($"topic", $"key" cast "string", $"value" cast "string")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
+// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
+val q = df
+ .select($"key" cast "string", $"value" cast "string")
+ .writeStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .start("topic2")
+
+// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
+// and path option are specified. In such situation topic will be taken from topic option. You do
+// not need to specify topic in all three places, but be aware, that if both topic and path options
+// are specified, they must match.
+val q = df
+ .select($"topic", $"key" cast "string", $"value" cast "string")
+ .writeStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("topic", "topic3")
+ .start("topic3")
+
{% endhighlight %}
{% highlight java %}
-// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
-StreamingQuery ds = df
+// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
+StreamingQuery q = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
@@ -499,21 +530,41 @@ StreamingQuery ds = df
.option("topic", "topic1")
.start();
-// Write key-value data from a DataFrame to Kafka using a topic specified in the data
-StreamingQuery ds = df
+// Write key-value data from a DataFrame to Kafka using the topic specified in the data
+StreamingQuery q = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();
+// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
+StreamingQuery q = df
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .writeStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .start("topic2");
+
+// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
+// and path option are specified. In such situation topic will be taken from topic option. You do
+// not need to specify topic in all three places, but be aware, that if both topic and path options
+// are specified, they must match.
+StreamingQuery q = df
+ .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+ .writeStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("topic", "topic3")
+ .start("topic3");
+
{% endhighlight %}
{% highlight python %}
-# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
-ds = df \
+# Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
+q = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
@@ -521,14 +572,34 @@ ds = df \
.option("topic", "topic1") \
.start()
-# Write key-value data from a DataFrame to Kafka using a topic specified in the data
-ds = df \
+# Write key-value data from a DataFrame to Kafka using the topic specified in the data
+q = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
+# Write key-value data from a DataFrame to Kafka using the topic specified in the path option
+q = df \
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+ .writeStream \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .start("topic2")
+
+# Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
+# and path option are specified. In such situation topic will be taken from topic option. You do
+# not need to specify topic in all three places, but be aware, that if both topic and path options
+# are specified, they must match.
+q = df \
+ .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+ .writeStream \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("topic", "topic3") \
+ .start("topic3")
+
{% endhighlight %}
@@ -539,27 +610,45 @@ ds = df \
{% highlight scala %}
-// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
+df.select($"key" cast "string", $"value" cast "string")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
-// Write key-value data from a DataFrame to Kafka using a topic specified in the data
-df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+// Write key-value data from a DataFrame to Kafka using the topic specified in the data
+df.select($"topic", $"key" cast "string", $"value" cast "string")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
+// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
+df.select($"key" cast "string", $"value" cast "string")
+ .write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .save("topic2")
+
+// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
+// and path option are specified. In such situation topic will be taken from topic option. You do
+// not need to specify topic in all three places, but be aware, that if both topic and path options
+// are specified, they must match.
+df.select($"topic", $"key" cast "string", $"value" cast "string")
+ .write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("topic", "topic3")
+ .save("topic3")
+
{% endhighlight %}
{% highlight java %}
-// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
@@ -567,19 +656,37 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.option("topic", "topic1")
.save();
-// Write key-value data from a DataFrame to Kafka using a topic specified in the data
+// Write key-value data from a DataFrame to Kafka using the topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();
+// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .write()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .save("topic2");
+
+// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
+// and path option are specified. In such situation topic will be taken from topic option. You do
+// not need to specify topic in all three places, but be aware, that if both topic and path options
+// are specified, they must match.
+df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+ .write()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("topic", "topic3")
+ .save("topic3");
+
{% endhighlight %}
{% highlight python %}
-# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+# Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
@@ -587,13 +694,31 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.option("topic", "topic1") \
.save()
-# Write key-value data from a DataFrame to Kafka using a topic specified in the data
+# Write key-value data from a DataFrame to Kafka using the topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
+# Write key-value data from a DataFrame to Kafka using the topic specified in the path option
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+ .write \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .save("topic2")
+
+# Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
+# and path option are specified. In such situation topic will be taken from topic option. You do
+# not need to specify topic in all three places, but be aware, that if both topic and path options
+# are specified, they must match.
+df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+ .write \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("topic", "topic3") \
+ .save("topic3")
+
{% endhighlight %}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index a8eff6be7ddd..24c7ae16cc91 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -144,7 +144,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
- val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+ val defaultTopic = resolveTopic(parameters)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic)
}
@@ -161,7 +161,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
- val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+ val topic = resolveTopic(parameters)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, specifiedKafkaParams,
topic)
@@ -196,6 +196,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
throw new IllegalArgumentException("Unknown option")
}
+ private def resolveTopic(parameters: Map[String, String]): Option[String] = {
+ val topicOption = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
+ val pathOption = parameters.get(DataSourceOptions.PATH_KEY).map(_.trim)
+
+ (topicOption, pathOption) match {
+ case (Some(t), Some(p)) if t != p =>
+ throw new IllegalArgumentException("'topic' and 'path' options must match"
+ + " if both defined: '" + t + "' must match '" + p + "'")
+ case (Some(_), _) => topicOption
+ case (None, Some(_)) => pathOption
+ case _ => None
+ }
+ }
+
private def failOnDataLoss(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.getOrElse(FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toBoolean
@@ -378,8 +392,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
import scala.collection.JavaConverters._
assert(inputSchema != null)
- val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
- val producerParams = kafkaParamsForProducer(options.asScala.toMap)
+ val params = options.asScala.toMap
+ val topic = resolveTopic(params)
+ val producerParams = kafkaParamsForProducer(params)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index e3101e157208..ba02a1509542 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
@@ -46,7 +47,7 @@ class KafkaStreamingWrite(
schema: StructType)
extends StreamingWrite {
- validateQuery(schema.toAttributes, producerParams, topic)
+ validateQuery(schema.toAttributes, topic, m => new AnalysisException(m))
override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, schema)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
index 041fac771763..86af28487034 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecor
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.types.{BinaryType, StringType}
/**
@@ -86,7 +87,8 @@ private[kafka010] abstract class KafkaRowWriter(
val value = projectedRow.getBinary(2)
if (topic == null) {
throw new NullPointerException(s"null topic present in the data. Use the " +
- s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
+ s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option or " +
+ s"${DataSourceOptions.PATH_KEY} option for setting a default topic.")
}
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
producer.send(record, callback)
@@ -99,38 +101,15 @@ private[kafka010] abstract class KafkaRowWriter(
}
private def createProjection = {
- val topicExpression = topic.map(Literal(_)).orElse {
- inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
- }.getOrElse {
- throw new IllegalStateException(s"topic option required when no " +
- s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
- }
- topicExpression.dataType match {
- case StringType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
- s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
- s"must be a ${StringType.catalogString}")
- }
+ KafkaWriter.validateQuery(inputSchema, topic, message => new IllegalStateException(message))
+ val topicExpression = topic.map(t => Literal.create(t, StringType))
+ .orElse(inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME))
+ .get
val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
.getOrElse(Literal(null, BinaryType))
- keyExpression.dataType match {
- case StringType | BinaryType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
- s"attribute unsupported type ${t.catalogString}")
- }
- val valueExpression = inputSchema
- .find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
- throw new IllegalStateException("Required attribute " +
- s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found")
- )
- valueExpression.dataType match {
- case StringType | BinaryType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
- s"attribute unsupported type ${t.catalogString}")
- }
+ val valueExpression = inputSchema.find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME)
+ .get
+
UnsafeProjection.create(
Seq(topicExpression, Cast(keyExpression, BinaryType),
Cast(valueExpression, BinaryType)), inputSchema)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index e1a9191cc5a8..b5da66d5e2c5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -22,17 +22,17 @@ import java.{util => ju}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.types.{BinaryType, StringType}
import org.apache.spark.util.Utils
/**
- * The [[KafkaWriter]] class is used to write data from a batch query
- * or structured streaming query, given by a [[QueryExecution]], to Kafka.
- * The data is assumed to have a value column, and an optional topic and key
- * columns. If the topic column is missing, then the topic must come from
- * the 'topic' configuration option. If the key column is missing, then a
- * null valued key field will be added to the
+ * The [[KafkaWriter]] class is used to write data from a batch query or structured
+ * streaming query, given by a [[QueryExecution]], to Kafka. The data is assumed to
+ * have a value column, and an optional topic and key columns. If the topic column is
+ * missing, then the topic must come from the 'topic' or 'path' configuration option.
+ * If the key column is missing, then a null valued key field will be added to the
* [[org.apache.kafka.clients.producer.ProducerRecord]].
*/
private[kafka010] object KafkaWriter extends Logging {
@@ -44,36 +44,40 @@ private[kafka010] object KafkaWriter extends Logging {
def validateQuery(
schema: Seq[Attribute],
- kafkaParameters: ju.Map[String, Object],
- topic: Option[String] = None): Unit = {
- schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
- if (topic.isEmpty) {
- throw new AnalysisException(s"topic option required when no " +
+ topic: Option[String] = None,
+ newException: String => Exception): Unit = {
+ topic.map(t => Literal.create(t, StringType))
+ .orElse(schema.find(_.name == TOPIC_ATTRIBUTE_NAME))
+ .getOrElse(
+ throw newException.apply(s"topic option is required when no " +
s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
- s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
- } else {
- Literal.create(topic.get, StringType)
- }
- ).dataType match {
- case StringType => // good
- case _ =>
- throw new AnalysisException(s"Topic type must be a ${StringType.catalogString}")
+ s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option, " +
+ s"${DataSourceOptions.PATH_KEY} option for setting a topic."))
+ .dataType match {
+ case StringType | BinaryType => // good
+ case t =>
+ throw newException.apply(s"$TOPIC_ATTRIBUTE_NAME attribute type " +
+ s"must be a ${StringType.catalogString}, but was ${t.catalogString}")
}
+
schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType | BinaryType => // good
- case _ =>
- throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
- s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
+ case t =>
+ throw newException.apply(s"$KEY_ATTRIBUTE_NAME attribute type " +
+ s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}, " +
+ s"but was ${t.catalogString}")
}
+
schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
- case _ =>
- throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
- s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
+ case t =>
+ throw newException.apply(s"$VALUE_ATTRIBUTE_NAME attribute type " +
+ s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}, " +
+ s"but was ${t.catalogString}")
}
}
@@ -83,7 +87,7 @@ private[kafka010] object KafkaWriter extends Logging {
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
- validateQuery(schema, kafkaParameters, topic)
+ validateQuery(schema, topic, message => new AnalysisException(message))
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 07d2b8a5dc42..42af693b579b 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.{DataStreamWriter, _}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -72,7 +72,75 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
}
- test("batch - null topic field value, and no topic option") {
+ test("batch - only path option specified") {
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+ val df = Seq("1", "2", "3").map(v => (null.asInstanceOf[String], v)).toDF("topic", "value")
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .save(topic)
+ checkAnswer(
+ createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
+ Row("1") :: Row("2") :: Row("3") :: Nil)
+ }
+
+ test("batch - topic, path and topic field value specified") {
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val df = Seq("1", "2", "3").map(v => (topic, v)).toDF("topic", "value")
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("topic", topic)
+ .save(topic)
+ checkAnswer(
+ createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
+ Row("1") :: Row("2") :: Row("3") :: Nil)
+ }
+
+ test("batch - different topic and path option values") {
+ val topic = newTopic()
+ val pathOptionTopic = newTopic()
+
+ val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+ val ex = intercept[IllegalArgumentException] {
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("topic", topic)
+ .save(pathOptionTopic)
+ }
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ "'topic' and 'path' options must match if both defined"))
+ }
+
+ test("batch - write to different topics") {
+ val topic1 = newTopic()
+ val topic2 = newTopic()
+ val topic3 = newTopic()
+ testUtils.createTopic(topic1)
+ testUtils.createTopic(topic2)
+ testUtils.createTopic(topic3)
+
+ val df = Seq(topic1, topic2, topic3).map(t => (t, "value")).toDF("topic", "value")
+ df.write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .save()
+ checkAnswer(
+ createKafkaReader(topic1).selectExpr("CAST(value as STRING) value"),
+ Row("value") :: Nil)
+ checkAnswer(
+ createKafkaReader(topic2).selectExpr("CAST(value as STRING) value"),
+ Row("value") :: Nil)
+ checkAnswer(
+ createKafkaReader(topic3).selectExpr("CAST(value as STRING) value"),
+ Row("value") :: Nil)
+ }
+
+ test("batch - null topic field value, and no topic or path option") {
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
val ex = intercept[SparkException] {
df.write
@@ -160,7 +228,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
}
}
- test("streaming - write aggregation w/o topic field, with topic option") {
+ test("streaming - write aggregation with topic field, path and topic option") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
@@ -168,38 +236,48 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
val writer = createKafkaWriter(
input.toDF().groupBy("value").count(),
withTopic = Some(topic),
+ withPath = Some(topic),
withOutputMode = Some(OutputMode.Update()))(
- withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) value")
+ withSelectExpr = "'foo' as topic",
+ "CAST(value as STRING) key",
+ "CAST(count as STRING) value")
- val reader = createKafkaReader(topic)
- .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
- .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
- .as[(Int, Int)]
+ checkStreamAggregation(input, writer, topic)
+ }
- try {
- input.addData("1", "2", "2", "3", "3", "3")
- failAfter(streamingTimeout) {
- writer.processAllAvailable()
- }
- checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
- input.addData("1", "2", "3")
- failAfter(streamingTimeout) {
- writer.processAllAvailable()
- }
- checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
- } finally {
- writer.stop()
- }
+ test("streaming - write aggregation with topic and path option") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF().groupBy("value").count(),
+ withTopic = Some(topic),
+ withPath = Some(topic),
+ withOutputMode = Some(OutputMode.Update()))(
+ withSelectExpr = "CAST(value as STRING) key",
+ "CAST(count as STRING) value")
+
+ checkStreamAggregation(input, writer, topic)
+ }
+
+ test("streaming - write aggregation with path option") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF().groupBy("value").count(),
+ withTopic = None,
+ withPath = Some(topic),
+ withOutputMode = Some(OutputMode.Update()))(
+ withSelectExpr = "CAST(value as STRING) key",
+ "CAST(count as STRING) value")
+
+ checkStreamAggregation(input, writer, topic)
}
- test("streaming - aggregation with topic field and topic option") {
- /* The purpose of this test is to ensure that the topic option
- * overrides the topic field. We begin by writing some data that
- * includes a topic field and value (e.g., 'foo') along with a topic
- * option. Then when we read from the topic specified in the option
- * we should see the data i.e., the data was written to the topic
- * option, and not to the topic in the data e.g., foo
- */
+ test("streaming - write aggregation with topic field and topic option") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
@@ -207,29 +285,30 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
val writer = createKafkaWriter(
input.toDF().groupBy("value").count(),
withTopic = Some(topic),
+ withPath = None,
withOutputMode = Some(OutputMode.Update()))(
- withSelectExpr = "'foo' as topic",
- "CAST(value as STRING) key", "CAST(count as STRING) value")
+ withSelectExpr = "'foo' as topic",
+ "CAST(value as STRING) key",
+ "CAST(count as STRING) value")
- val reader = createKafkaReader(topic)
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
- .as[(Int, Int)]
+ checkStreamAggregation(input, writer, topic)
+ }
- try {
- input.addData("1", "2", "2", "3", "3", "3")
- failAfter(streamingTimeout) {
- writer.processAllAvailable()
- }
- checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
- input.addData("1", "2", "3")
- failAfter(streamingTimeout) {
- writer.processAllAvailable()
- }
- checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
- } finally {
- writer.stop()
- }
+ test("streaming - write aggregation with topic field and path option") {
+ val input = MemoryStream[String]
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+
+ val writer = createKafkaWriter(
+ input.toDF().groupBy("value").count(),
+ withTopic = None,
+ withPath = Some(topic),
+ withOutputMode = Some(OutputMode.Update()))(
+ withSelectExpr = "'foo' as topic",
+ "CAST(value as STRING) key",
+ "CAST(count as STRING) value")
+
+ checkStreamAggregation(input, writer, topic)
}
test("streaming - sink progress is produced") {
@@ -275,7 +354,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
}
assert(ex.getMessage
.toLowerCase(Locale.ROOT)
- .contains("topic option required when no 'topic' attribute is present"))
+ .contains("topic option is required when no 'topic' attribute is present"))
try {
/* No value field */
@@ -312,7 +391,9 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
+ assert(
+ ex.getMessage.toLowerCase(Locale.ROOT)
+ .contains("topic attribute type must be a string, but was int"))
try {
/* value field wrong type */
@@ -435,13 +516,14 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
private def createKafkaWriter(
input: DataFrame,
withTopic: Option[String] = None,
+ withPath: Option[String] = None,
withOutputMode: Option[OutputMode] = None,
withOptions: Map[String, String] = Map[String, String]())
(withSelectExpr: String*): StreamingQuery = {
var stream: DataStreamWriter[Row] = null
withTempDir { checkpointDir =>
var df = input.toDF()
- if (withSelectExpr.length > 0) {
+ if (withSelectExpr.nonEmpty) {
df = df.selectExpr(withSelectExpr: _*)
}
stream = df.writeStream
@@ -454,6 +536,34 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
withOutputMode.foreach(stream.outputMode(_))
withOptions.foreach(opt => stream.option(opt._1, opt._2))
}
- stream.start()
+ withPath match {
+ case Some(path) => stream.start(path)
+ case _ => stream.start()
+ }
+ }
+
+ private def checkStreamAggregation(
+ input: MemoryStream[String],
+ stream: StreamingQuery,
+ topic: String): Unit = {
+ val dataset = createKafkaReader(topic)
+ .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+ .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+ .as[(Int, Int)]
+
+ try {
+ input.addData("1", "2", "2", "3", "3", "3")
+ failAfter(streamingTimeout) {
+ stream.processAllAvailable()
+ }
+ checkDatasetUnorderly(dataset, (1, 1), (2, 2), (3, 3))
+ input.addData("1", "2", "3")
+ failAfter(streamingTimeout) {
+ stream.processAllAvailable()
+ }
+ checkDatasetUnorderly(dataset, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
+ } finally {
+ stream.stop()
+ }
}
}