Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d371758
add kafka relation and refactor kafka source
Jan 20, 2017
b6c3055
update
Jan 20, 2017
4c81812
update
Jan 20, 2017
ab02a4c
single kafka provider for both stream and batch
Jan 20, 2017
e6b57ed
added uninterruptible thread version of kafka offset reader
Jan 24, 2017
ff94ed8
added uninterruptible thread version of kafka offset reader
Jan 24, 2017
f8fd34c
update tests
Jan 24, 2017
41271e2
resolve conflicts in KafakSource
Jan 24, 2017
74d96fc
update comments
Jan 24, 2017
d31fc81
address comments from @zsxwing
Jan 25, 2017
1db1649
update
Jan 25, 2017
3b0d48b
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Jan 25, 2017
a5b0269
address comments from @zsxwing
Jan 26, 2017
c08c01f
late binding offsets
Jan 27, 2017
79d335e
update to late binding logic
Jan 27, 2017
a44b365
Merge branch 'SPARK-18682' into kafka-writer
Jan 27, 2017
51291e3
remove kafka log4j debug
Jan 27, 2017
b597cf1
remove kafka log4j debug
Jan 27, 2017
84b32c5
Merge branch 'SPARK-18682' into kafka-writer
Jan 30, 2017
f5ae301
update
Jan 31, 2017
2487a72
address comments from @zsxwing
Jan 31, 2017
789d3af
update
Jan 31, 2017
56a06e7
Merge branch 'SPARK-18682' into kafka-writer
Feb 1, 2017
e74473b
update
Feb 2, 2017
73df054
update
Feb 2, 2017
5b48fc6
address comments from @tdas
Feb 3, 2017
5776009
address feedback from @tdas and @sxwing
Feb 3, 2017
63d453f
update merge
Feb 3, 2017
3c4eecf
update
Feb 3, 2017
b0611e4
update
Feb 16, 2017
3c6a52b
update
Feb 17, 2017
8ba33a7
update
Feb 23, 2017
68a2a18
update
Feb 23, 2017
c8c38e1
update
Feb 23, 2017
71f8de0
update
Feb 23, 2017
c4c9395
address comments from @tdas
Feb 24, 2017
8f5da8b
update
Feb 24, 2017
c85b803
update
Feb 24, 2017
9d7a00d
update
Feb 24, 2017
66fa01b
update
Feb 24, 2017
129cfcd
update
Feb 24, 2017
67e3c06
update
Feb 24, 2017
e6b6dc1
revise exceptions and topic option
Feb 24, 2017
3981d7b
Merge branch 'master' of https://github.com/apache/spark into kafka-w…
Feb 27, 2017
b48f173
address comments from @tdas @zsxwing
Feb 28, 2017
2dd3ffb
update
Mar 3, 2017
b1d554a
address comments from @zsxwing
Mar 6, 2017
107e513
update
Mar 6, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.streaming.Sink

private[kafka010] class KafkaSink(
sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
topic: Option[String]) extends Sink with Logging {
@volatile private var latestBatchId = -1L

override def toString(): String = "KafkaSink"

override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
logInfo(s"Skipping already committed batch $batchId")
} else {
KafkaWriter.write(sqlContext.sparkSession,
data.queryExecution, executorKafkaParams, topic)
latestBatchId = batchId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@ import java.util.UUID
import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Source
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

/**
* The provider class for the [[KafkaSource]]. This provider is designed such that it throws
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSourceProvider
with RelationProvider with Logging {
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with Logging {
import KafkaSourceProvider._

override def shortName(): String = "kafka"
Expand Down Expand Up @@ -152,6 +158,72 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre
endingRelationOffsets)
}

override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
new KafkaSink(sqlContext,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
}

override def createRelation(
outerSQLContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
mode match {
case SaveMode.Overwrite | SaveMode.Ignore =>
throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " +
s"Allowed save modes are ${SaveMode.Append} and " +
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)

/* This method is suppose to return a relation that reads the data that was written.
* We cannot support this for Kafka. Therefore, in order to make things consistent,
* we return an empty base relation.
*/
new BaseRelation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this return value is called in CreateDataSourceTableAsSelectCommand. Kafka cannot support it. I think it's better to make the methods of this special BaseRelation throw UnsupportedOperationException in case the returned relation is used by mistake.

override def sqlContext: SQLContext = unsupportedException
override def schema: StructType = unsupportedException
override def needConversion: Boolean = unsupportedException
override def sizeInBytes: Long = unsupportedException
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
private def unsupportedException =
throw new UnsupportedOperationException("BaseRelation from Kafka write " +
"operation is not usable.")
}
}

private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) {
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys "
+ "are serialized with ByteArraySerializer.")
}

if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
{
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as "
+ "value are serialized with ByteArraySerializer.")
}
parameters
.keySet
.filter(_.toLowerCase.startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
}

private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
ConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
Expand Down Expand Up @@ -381,6 +453,7 @@ private[kafka010] object KafkaSourceProvider {
private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val TOPIC_OPTION_KEY = "topic"

private val deserClassName = classOf[ByteArrayDeserializer].getName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.kafka.clients.producer.{KafkaProducer, _}
import org.apache.kafka.common.serialization.ByteArraySerializer

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
import org.apache.spark.sql.types.{BinaryType, StringType}

/**
* A simple trait for writing out data in a single Spark task, without any concerns about how
* to commit or abort tasks. Exceptions thrown by the implementation of this class will
* automatically trigger task aborts.
*/
private[kafka010] class KafkaWriteTask(
producerConfiguration: ju.Map[String, Object],
inputSchema: Seq[Attribute],
topic: Option[String]) {
// used to synchronize with Kafka callbacks
@volatile private var failedWrite: Exception = null
private val projection = createProjection
private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _

/**
* Writes key value data out to topics.
*/
def execute(iterator: Iterator[InternalRow]): Unit = {
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
while (iterator.hasNext && failedWrite == null) {
val currentRow = iterator.next()
val projectedRow = projection(currentRow)
val topic = projectedRow.getUTF8String(0)
val key = projectedRow.getBinary(1)
val value = projectedRow.getBinary(2)
if (topic == null) {
throw new NullPointerException(s"null topic present in the data. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
}
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
val callback = new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (failedWrite == null && e != null) {
failedWrite = e
}
}
}
producer.send(record, callback)
}
}

def close(): Unit = {
if (producer != null) {
checkForErrors
producer.close()
checkForErrors
producer = null
}
}

private def createProjection: UnsafeProjection = {
val topicExpression = topic.map(Literal(_)).orElse {
inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
}.getOrElse {
throw new IllegalStateException(s"topic option required when no " +
s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
}
topicExpression.dataType match {
case StringType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
s"must be a ${StringType}")
}
val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
.getOrElse(Literal(null, BinaryType))
keyExpression.dataType match {
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
}
val valueExpression = inputSchema
.find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
throw new IllegalStateException(s"Required attribute " +
s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found")
)
valueExpression.dataType match {
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
}
UnsafeProjection.create(
Seq(topicExpression, Cast(keyExpression, BinaryType),
Cast(valueExpression, BinaryType)), inputSchema)
}

private def checkForErrors: Unit = {
if (failedWrite != null) {
throw failedWrite
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.types.{BinaryType, StringType}
import org.apache.spark.util.Utils

/**
* The [[KafkaWriter]] class is used to write data from a batch query
* or structured streaming query, given by a [[QueryExecution]], to Kafka.
* The data is assumed to have a value column, and an optional topic and key
* columns. If the topic column is missing, then the topic must come from
* the 'topic' configuration option. If the key column is missing, then a
* null valued key field will be added to the
* [[org.apache.kafka.clients.producer.ProducerRecord]].
*/
private[kafka010] object KafkaWriter extends Logging {
val TOPIC_ATTRIBUTE_NAME: String = "topic"
val KEY_ATTRIBUTE_NAME: String = "key"
val VALUE_ATTRIBUTE_NAME: String = "value"

override def toString: String = "KafkaWriter"

def validateQuery(
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic == None) {
throw new AnalysisException(s"topic option required when no " +
s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
} else {
Literal(topic.get, StringType)
}
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"Topic type must be a String")
}
schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
}
schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
}
}

def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
validateQuery(queryExecution, kafkaParameters, topic)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
}
}
}
Loading