Skip to content

Conversation

@ghost
Copy link

@ghost ghost commented Feb 23, 2017

What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.

Streaming Kafka Sink

  • When addBatch is called
    -- If batchId is great than the last written batch
    --- Write batch to Kafka
    ---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
    -- Else ignore

Batch Kafka Sink

  • KafkaSourceProvider will implement CreatableRelationProvider
  • CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
  • Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
  • Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException

@tdas @zsxwing

How was this patch tested?

The following unit tests will be included

  • write to stream with topic field: valid stream write with data that includes an existing topic in the schema
  • write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
  • write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
  • write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
  • write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
  • write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.

Examples

// Structured Streaming
val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch 
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()

Please review http://spark.apache.org/contributing.html before opening a pull request.

Tyson Condie added 2 commits February 24, 2017 10:06
@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73436 has finished for PR 17043 at commit 129cfcd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73438 has finished for PR 17043 at commit 67e3c06.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73443 has finished for PR 17043 at commit e6b6dc1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made one pass. Most of my comments are minor.

import org.apache.spark.sql.execution.streaming.Sink

private[kafka010] class KafkaSink(
sqlContext: SQLContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces

sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
defaultTopic: Option[String]) extends Sink with Logging {
var latestBatchId = -1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private

val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val TOPIC_OPTION_KEY = "topic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like only this one needs to be public

partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
val defaultTopic = caseInsensitiveParams.get(TOPIC_OPTION_KEY).map(_.trim.toLowerCase)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove toLowerCase. Kakfa's topic is case sensitive.

outputMode: OutputMode): Sink = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
val defaultTopic = caseInsensitiveParams.get(TOPIC_OPTION_KEY).map(_.trim.toLowerCase)
val specifiedKafkaParams =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to throw an exception if the user specifies serializer like the source. Also need to add tests.

    if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}")) {
      throw new IllegalArgumentException(
        s"Kafka option '${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}' is not supported as keys "
          + "are deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations "
          + "to explicitly deserialize the keys.")
    }

    if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}"))
    {
      throw new IllegalArgumentException(
        s"Kafka option '${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}' is not supported as "
          + "value are deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame "
          + "operations to explicitly deserialize the values.")
    }

"save mode overwrite not allowed for kafka"))
}

test("write big data with small producer buffer") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what's the purpose of this test?

}

test("write batch with null topic field value, and no topic option") {
val df = spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val df = Seq[(String, String)](null -> "1").toDF("topic", "value")

test("write batch unsupported save modes") {
val topic = newTopic()
testUtils.createTopic(topic)
val df = spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val df = Seq[(String, String)](null -> "1").toDF("topic", "value")

test("write batch to kafka") {
val topic = newTopic()
testUtils.createTopic(topic)
val df = spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value")

override def runAction(): Unit = {
ms.addData(values)
q.processAllAvailable()
Thread.sleep(5000) // wait for data to appear in Kafka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use sleep. How about passing a latest offset to AddMoreData and waiting until you can see it via KafkaTestUtils.getLatestOffsets?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tcondie see my higher-level comments about refactoring the tests.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its looks quite good, but some changes are still needed.

  • Tests can be improved a lot. Its too verbose
  • Validation of schema needs to be done before creating the sink to throw AnalysisException before query has started.

import org.apache.spark.sql.execution.streaming.Sink

private[kafka010] class KafkaSink(
sqlContext: SQLContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect indent

sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
defaultTopic: Option[String]) extends Sink with Logging {
var latestBatchId = -1L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this volatile, just in case we ever parallelize things.

}

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

private[kafka010] class KafkaSink(
sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
defaultTopic: Option[String]) extends Sink with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to topic.

data.queryExecution, executorKafkaParams, defaultTopic)
latestBatchId = batchId
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set a good toString() so that it shows up nicely in the StreamingQueryProgress.

}


test("write data with bad schema") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad schema error should be thrown immediately when start() is called, rather later after the query has started. you have to validate before creating the sink object for this.


private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

test("write to stream with topic field") {
Copy link
Contributor

@tdas tdas Feb 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hugely verbose and round about way of testing kafka - using the streaming source to test the streaming sink is complicated and hard to debug when things go wrong. I think we should investigate a better way. How about you try this, similar to the FileStreamSinkSuite

  • Use memory stream as you are doing, just use AddData (no need for AddMoreData)
  • Define a function called checkKafka(expectedData) which will
    • processAllAvailable
    • read data in kafka as batch query and verify the results.

Then the test would look simpler.

testStream (
   AddData(...)
   checkKafka(...)
   AddData(...)
   checkKafka(...)
...
)

Here is a what the checkKafka method would look like. See
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala#L22

def checkKafka[T](expectedResults: T*): AssertOnQuery = AssertOnQuery { q => 
   q.processAllAvailable
   val kafkaData = spark.read.format("kafka") ....
   checkDataset(expectedResults, kafkaData)
}


private val topicId = new AtomicInteger(0)

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would organize the tests as follows.

  • tests for batch queries as they test lesser code paths
    • names should start with "batch - "
    • these would test for all the corner cases of topic not there, etc.
  • tests for streaming queries
    • names should start with "streaming - "

.selectExpr(s"'$topic' as topic", "CAST(value as INT) as key", "value")
.writeStream
.format("kafka")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need checkpoint location for this test? simplify each test as much as possible with as little requirements as possible.

and a lot of this code duplicate. make internal functions to reduce duplication.

writer = input.toDF()
.writeStream
.format("kafka")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need checkpointing?

count += 1
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)

Copy link
Contributor

@tdas tdas Feb 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line
and you can make this code simpler

val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)val row = new SpecificInternalRow(fieldTypes)
row.update(0, data)
val iter = Seq.fill(1000)(converter.apply(row)).iterator
writeTask.execute(iter)

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73614 has finished for PR 17043 at commit b48f173.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private[kafka010] class KafkaSink(
sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
defaultTopic: Option[String]) extends Sink with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is still called defaultTopic. its not a default one.

@SparkQA
Copy link

SparkQA commented Mar 3, 2017

Test build #73870 has finished for PR 17043 at commit 2dd3ffb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Left some minor comments.

s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim.toLowerCase)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove toLowerCase

* We cannot support this for Kafka. Therefore, in order to make things consistent,
* we return an empty base relation.
*/
new BaseRelation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this return value is called in CreateDataSourceTableAsSelectCommand. Kafka cannot support it. I think it's better to make the methods of this special BaseRelation throw UnsupportedOperationException in case the returned relation is used by mistake.

.keySet
.filter(_.toLowerCase.startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap + (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[BytesSerializer].getName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

After fixing them, KafkaWriteTask doesn't need to set these configs.

inputSchema: Seq[Attribute],
topic: Option[String]) {
// used to synchronize with Kafka callbacks
@volatile var failedWrite: Exception = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private

topic: Option[String]) {
// used to synchronize with Kafka callbacks
@volatile var failedWrite: Exception = null
val projection = createProjection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private

// used to synchronize with Kafka callbacks
@volatile var failedWrite: Exception = null
val projection = createProjection
var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private


private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG

Please also fix the exception message.

+ "to explicitly deserialize the keys.")
}

if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

Please also fix the exception message.

.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should read from Kafka and compare the results in order to verify the results were written correctly.

@SparkQA
Copy link

SparkQA commented Mar 6, 2017

Test build #74034 has finished for PR 17043 at commit b1d554a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def createKafkaReader(topic: String): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: can you move these utility functions (topicId, newTopic, createKafka*) to the end of the class, so that the tests are earlier in the class.

.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
checkAnswer(createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: formatting.

checkAnswer(
  createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
  Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)

input: DataFrame,
withTopic: Option[String] = None,
withOutputMode: Option[OutputMode] = None,
withOptions: Option[Map[String, String]] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can default to empty instead of using option around a map.

@tdas
Copy link
Contributor

tdas commented Mar 6, 2017

LGTM to the code except minor nits in tests. Please update the title (remove WIP) and description (add design) and then we can merge this.

@ghost ghost changed the title [SPARK-19719][SS][WIP] Kafka writer for both structured streaming and batch queires [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires Mar 6, 2017
@SparkQA
Copy link

SparkQA commented Mar 6, 2017

Test build #74043 has finished for PR 17043 at commit 107e513.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Mar 7, 2017

Merging this to master. Thank you very much @tcondie

@asfgit asfgit closed this in b0a5cd8 Mar 7, 2017
asfgit pushed a commit that referenced this pull request Mar 7, 2017
…h queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
-- Else ignore

### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
- Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException

tdas zsxwing

## How was this patch tested?

### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
- write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
- write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.

### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()
```
Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17043 from tcondie/kafka-writer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants