Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 151 additions & 26 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,15 @@ The Dataframe being written to Kafka should have the following columns in schema
<td>string</td>
</tr>
</table>
\* The topic column is required if the "topic" configuration option is not specified.<br>
\* The topic column is required if the "topic" or "path" configuration option is not specified.<br>

The value column is the only required option. If a key column is not specified then
a ```null``` valued key column will be automatically added (see Kafka semantics on
how ```null``` valued key values are handled). If a topic column exists then its value
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
is used as the topic when writing the given row to Kafka, unless the "topic" or "path"
configuration option is set i.e., the "topic" configuration option overrides both the
"path" configuration option and the topic column; the "path" configuration option
overrides the topic column.

The following options must be set for the Kafka sink
for both batch and streaming queries.
Expand All @@ -457,8 +459,17 @@ The following configurations are optional:
<td>string</td>
<td>none</td>
<td>streaming and batch</td>
<td>Sets the topic that all rows will be written to in Kafka. This option overrides
"path" option and any topic column that may exist in the data.</td>
</tr>
<tr>
<td>path</td>
<td>string</td>
<td>none</td>
<td>streaming and batch</td>
<td>Sets the topic that all rows will be written to in Kafka. This option overrides any
topic column that may exist in the data.</td>
topic column that may exist in the data and is overridden by "topic" option.
However, if both "topic" and "path" options are specified they must match.</td>
</tr>
</table>

Expand All @@ -468,67 +479,127 @@ The following configurations are optional:
<div data-lang="scala" markdown="1">
{% highlight scala %}

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
val q = df
.select($"key" cast "string", $"value" cast "string")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
// Write key-value data from a DataFrame to Kafka using the topic specified in the data
val q = df
.select($"topic", $"key" cast "string", $"value" cast "string")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()

// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
val q = df
.select($"key" cast "string", $"value" cast "string")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start("topic2")

// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
// and path option are specified. In such situation topic will be taken from topic option. You do
// not need to specify topic in all three places, but be aware, that if both topic and path options
// are specified, they must match.
val q = df
.select($"topic", $"key" cast "string", $"value" cast "string")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic3")
.start("topic3")

{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
StreamingQuery q = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
// Write key-value data from a DataFrame to Kafka using the topic specified in the data
StreamingQuery q = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();

// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
StreamingQuery q = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since you added it I'd change it to .select($"key" cast "string", $"value" cast "string") even if it's the only place in the doc. It's simply more type-safe.

Copy link
Author

@Nimfadora Nimfadora Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better to change this moment in all places in this doc file, as we anyway touch it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make a deviation compared to all other places?

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the latest change don't fully understand the direction. Now half of the doc uses selectExpr and the other part select. Still questioning the need, what is the benefit? If the selectExpr executed and properly formed not more unstable than the simple select in the doc right?

Copy link
Author

@Nimfadora Nimfadora Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are trying to make this expression type safe, as it such case user will see mistakes at compile time if have some. select is used only for scala, as we cannot use same shortened($) syntax for Java and Python, but we can use select(col("key").cast("string")) for all three languages if we really want it both to be typesafe and looks same in all three cases. What do you think?

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the intention now, thanks for the explanation. I personally would leave as it is in the original code but not telling the direction is wrong. Let's decide by a committer who potentially picks this up.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaceklaskowski what do you think: should we revert the changes here or use select(col("key").cast("string")) in all places to provide type safety?

.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start("topic2");

// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
// and path option are specified. In such situation topic will be taken from topic option. You do
// not need to specify topic in all three places, but be aware, that if both topic and path options
// are specified, they must match.
StreamingQuery q = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic3")
.start("topic3");

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
# Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
q = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
# Write key-value data from a DataFrame to Kafka using the topic specified in the data
q = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()

# Write key-value data from a DataFrame to Kafka using the topic specified in the path option
q = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start("topic2")

# Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
# and path option are specified. In such situation topic will be taken from topic option. You do
# not need to specify topic in all three places, but be aware, that if both topic and path options
# are specified, they must match.
q = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic3") \
.start("topic3")

{% endhighlight %}
</div>
</div>
Expand All @@ -539,61 +610,115 @@ ds = df \
<div data-lang="scala" markdown="1">
{% highlight scala %}

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
df.select($"key" cast "string", $"value" cast "string")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
// Write key-value data from a DataFrame to Kafka using the topic specified in the data
df.select($"topic", $"key" cast "string", $"value" cast "string")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()

// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
df.select($"key" cast "string", $"value" cast "string")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save("topic2")

// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
// and path option are specified. In such situation topic will be taken from topic option. You do
// not need to specify topic in all three places, but be aware, that if both topic and path options
// are specified, they must match.
df.select($"topic", $"key" cast "string", $"value" cast "string")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic3")
.save("topic3")

{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
// Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
// Write key-value data from a DataFrame to Kafka using the topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();

// Write key-value data from a DataFrame to Kafka using the topic specified in the path option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save("topic2");

// Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
// and path option are specified. In such situation topic will be taken from topic option. You do
// not need to specify topic in all three places, but be aware, that if both topic and path options
// are specified, they must match.
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic3")
.save("topic3");

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
# Write key-value data from a DataFrame to Kafka using the topic specified in the topic option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
# Write key-value data from a DataFrame to Kafka using the topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()

# Write key-value data from a DataFrame to Kafka using the topic specified in the path option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save("topic2")

# Write key-value data from a DataFrame to Kafka, when DataFrame has topic column and both topic
# and path option are specified. In such situation topic will be taken from topic option. You do
# not need to specify topic in all three places, but be aware, that if both topic and path options
# are specified, they must match.
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic3") \
.save("topic3")

{% endhighlight %}
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val defaultTopic = resolveTopic(parameters)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic)
}
Expand All @@ -161,7 +161,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val topic = resolveTopic(parameters)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, specifiedKafkaParams,
topic)
Expand Down Expand Up @@ -196,6 +196,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
throw new IllegalArgumentException("Unknown option")
}

private def resolveTopic(parameters: Map[String, String]): Option[String] = {
val topicOption = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val pathOption = parameters.get(DataSourceOptions.PATH_KEY).map(_.trim)

(topicOption, pathOption) match {
case (Some(t), Some(p)) if t != p =>
throw new IllegalArgumentException("'topic' and 'path' options must match"
+ " if both defined: '" + t + "' must match '" + p + "'")
case (Some(_), _) => topicOption
case (None, Some(_)) => pathOption
case _ => None
}
}

private def failOnDataLoss(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.getOrElse(FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toBoolean

Expand Down Expand Up @@ -378,8 +392,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
import scala.collection.JavaConverters._

assert(inputSchema != null)
val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asScala.toMap)
val params = options.asScala.toMap
val topic = resolveTopic(params)
val producerParams = kafkaParamsForProducer(params)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
Expand Down Expand Up @@ -46,7 +47,7 @@ class KafkaStreamingWrite(
schema: StructType)
extends StreamingWrite {

validateQuery(schema.toAttributes, producerParams, topic)
validateQuery(schema.toAttributes, topic, m => new AnalysisException(m))

override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, schema)
Expand Down
Loading