diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index 5bdc1b5fe9f3..8b907065af1d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -25,9 +25,9 @@ import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecor import org.apache.kafka.common.header.Header import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} -import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection} +import org.apache.spark.sql.types.BinaryType /** * Writes out data in a single Spark task, without any concerns about how @@ -116,66 +116,13 @@ private[kafka010] abstract class KafkaRowWriter( } private def createProjection = { - val topicExpression = topic.map(Literal(_)).orElse { - inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME) - }.getOrElse { - throw new IllegalStateException(s"topic option required when no " + - s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present") - } - topicExpression.dataType match { - case StringType => // good - case t => - throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " + - s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " + - s"must be a ${StringType.catalogString}") - } - val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME) - .getOrElse(Literal(null, BinaryType)) - keyExpression.dataType match { - case StringType | BinaryType => // good - case t => - throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " + - s"attribute unsupported type ${t.catalogString}") - } - val valueExpression = inputSchema - .find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse( - throw new IllegalStateException("Required attribute " + - s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found") - ) - valueExpression.dataType match { - case StringType | BinaryType => // good - case t => - throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + - s"attribute unsupported type ${t.catalogString}") - } - val headersExpression = inputSchema - .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( - Literal(CatalystTypeConverters.convertToCatalyst(null), - KafkaRecordToRowConverter.headersType) - ) - headersExpression.dataType match { - case KafkaRecordToRowConverter.headersType => // good - case t => - throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " + - s"attribute unsupported type ${t.catalogString}") - } - val partitionExpression = - inputSchema.find(_.name == KafkaWriter.PARTITION_ATTRIBUTE_NAME) - .getOrElse(Literal(null, IntegerType)) - partitionExpression.dataType match { - case IntegerType => // good - case t => - throw new IllegalStateException(s"${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " + - s"attribute unsupported type $t. ${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " + - s"must be a ${IntegerType.catalogString}") - } UnsafeProjection.create( Seq( - topicExpression, - Cast(keyExpression, BinaryType), - Cast(valueExpression, BinaryType), - headersExpression, - partitionExpression + KafkaWriter.topicExpression(inputSchema, topic), + Cast(KafkaWriter.keyExpression(inputSchema), BinaryType), + Cast(KafkaWriter.valueExpression(inputSchema), BinaryType), + KafkaWriter.headersExpression(inputSchema), + KafkaWriter.partitionExpression(inputSchema) ), inputSchema ) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index 9b0d11f137ce..5ef4b3a1c19d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.types.{BinaryType, IntegerType, MapType, StringType} +import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType} import org.apache.spark.util.Utils /** @@ -49,51 +49,14 @@ private[kafka010] object KafkaWriter extends Logging { schema: Seq[Attribute], kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( - if (topic.isEmpty) { - throw new AnalysisException(s"topic option required when no " + - s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " + - s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.") - } else { - Literal.create(topic.get, StringType) - } - ).dataType match { - case StringType => // good - case _ => - throw new AnalysisException(s"Topic type must be a ${StringType.catalogString}") - } - schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse( - Literal(null, StringType) - ).dataType match { - case StringType | BinaryType => // good - case _ => - throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " + - s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}") - } - schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse( - throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found") - ).dataType match { - case StringType | BinaryType => // good - case _ => - throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " + - s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}") - } - schema.find(_.name == HEADERS_ATTRIBUTE_NAME).getOrElse( - Literal(CatalystTypeConverters.convertToCatalyst(null), - KafkaRecordToRowConverter.headersType) - ).dataType match { - case KafkaRecordToRowConverter.headersType => // good - case _ => - throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " + - s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}") - } - schema.find(_.name == PARTITION_ATTRIBUTE_NAME).getOrElse( - Literal(null, IntegerType) - ).dataType match { - case IntegerType => // good - case _ => - throw new AnalysisException(s"$PARTITION_ATTRIBUTE_NAME attribute type " + - s"must be an ${IntegerType.catalogString}") + try { + topicExpression(schema, topic) + keyExpression(schema) + valueExpression(schema) + headersExpression(schema) + partitionExpression(schema) + } catch { + case e: IllegalStateException => throw new AnalysisException(e.getMessage) } } @@ -110,4 +73,53 @@ private[kafka010] object KafkaWriter extends Logging { finallyBlock = writeTask.close()) } } + + def topicExpression(schema: Seq[Attribute], topic: Option[String] = None): Expression = { + topic.map(Literal(_)).getOrElse( + expression(schema, TOPIC_ATTRIBUTE_NAME, Seq(StringType)) { + throw new IllegalStateException(s"topic option required when no " + + s"'${TOPIC_ATTRIBUTE_NAME}' attribute is present. Use the " + + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.") + } + ) + } + + def keyExpression(schema: Seq[Attribute]): Expression = { + expression(schema, KEY_ATTRIBUTE_NAME, Seq(StringType, BinaryType)) { + Literal(null, BinaryType) + } + } + + def valueExpression(schema: Seq[Attribute]): Expression = { + expression(schema, VALUE_ATTRIBUTE_NAME, Seq(StringType, BinaryType)) { + throw new IllegalStateException(s"Required attribute '${VALUE_ATTRIBUTE_NAME}' not found") + } + } + + def headersExpression(schema: Seq[Attribute]): Expression = { + expression(schema, HEADERS_ATTRIBUTE_NAME, Seq(KafkaRecordToRowConverter.headersType)) { + Literal(CatalystTypeConverters.convertToCatalyst(null), + KafkaRecordToRowConverter.headersType) + } + } + + def partitionExpression(schema: Seq[Attribute]): Expression = { + expression(schema, PARTITION_ATTRIBUTE_NAME, Seq(IntegerType)) { + Literal(null, IntegerType) + } + } + + private def expression( + schema: Seq[Attribute], + attrName: String, + desired: Seq[DataType])( + default: => Expression): Expression = { + val expr = schema.find(_.name == attrName).getOrElse(default) + if (!desired.exists(_.sameType(expr.dataType))) { + throw new IllegalStateException(s"$attrName attribute unsupported type " + + s"${expr.dataType.catalogString}. $attrName must be a(n) " + + s"${desired.map(_.catalogString).mkString(" or ")}") + } + expr + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index cbf4952406c0..031f609cb92b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -19,12 +19,15 @@ package org.apache.spark.sql.kafka010 import java.util.Locale +import scala.reflect.ClassTag + import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{BinaryType, DataType} import org.apache.spark.util.Utils @@ -192,24 +195,9 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val topic = newTopic() testUtils.createTopic(topic) - /* No topic field or topic option */ - var writer: StreamingQuery = null - var ex: Exception = null - try { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = "CAST(null as STRING) as topic", "value" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - } finally { - writer.stop() + runAndVerifyException[StreamingQueryException](inputTopic, "null topic present in the data.") { + createKafkaWriter(input.toDF())(withSelectExpr = "CAST(null as STRING) as topic", "value") } - assert(ex.getCause.getCause.getMessage - .toLowerCase(Locale.ROOT) - .contains("null topic present in the data.")) } test("streaming - write data with bad schema") { @@ -226,24 +214,10 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val topic = newTopic() testUtils.createTopic(topic) - val ex = intercept[AnalysisException] { - /* No topic field or topic option */ - createKafkaWriter(input.toDF())( - withSelectExpr = "value as key", "value" - ) - } - assert(ex.getMessage - .toLowerCase(Locale.ROOT) - .contains("topic option required when no 'topic' attribute is present")) - - val ex2 = intercept[AnalysisException] { - /* No value field */ - createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "value as key" - ) - } - assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( - "required attribute 'value' not found")) + assertWrongSchema(topic, input, Seq("value as key", "value"), + "topic option required when no 'topic' attribute is present") + assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as key"), + "required attribute 'value' not found") } test("streaming - write data with valid schema but wrong types") { @@ -258,43 +232,18 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { .option("startingOffsets", "earliest") .load() .selectExpr("CAST(value as STRING) value") + .toDF() val topic = newTopic() testUtils.createTopic(topic) - val ex = intercept[AnalysisException] { - /* topic field wrong type */ - createKafkaWriter(input.toDF())( - withSelectExpr = s"CAST('1' as INT) as topic", "value" - ) - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) - - val ex2 = intercept[AnalysisException] { - /* value field wrong type */ - createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value" - ) - } - assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( - "value attribute type must be a string or binary")) - - val ex3 = intercept[AnalysisException] { - /* key field wrong type */ - createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value" - ) - } - assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains( - "key attribute type must be a string or binary")) - - val ex4 = intercept[AnalysisException] { - /* partition field wrong type */ - createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "value as partition", "value" - ) - } - assert(ex4.getMessage.toLowerCase(Locale.ROOT).contains( - "partition attribute type must be an int")) + assertWrongSchema(topic, input, Seq("CAST('1' as INT) as topic", "value"), + "topic must be a(n) string") + assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as INT) as value"), + "value must be a(n) string or binary") + assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as INT) as key", "value"), + "key must be a(n) string or binary") + assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as partition", "value"), + "partition must be a(n) int") } test("streaming - write to non-existing topic") { @@ -310,21 +259,9 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { .load() val topic = newTopic() - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))() - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - } - throw writer.exception.get - } - } finally { - writer.stop() + runAndVerifyException[StreamingQueryException](inputTopic, "job aborted") { + createKafkaWriter(input.toDF(), withTopic = Some(topic))() } - assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted")) } test("streaming - exception on config serializer") { @@ -339,21 +276,10 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { .option("subscribe", inputTopic) .load() - val ex = intercept[IllegalArgumentException] { - createKafkaWriter( - input.toDF(), - withOptions = Map("kafka.key.serializer" -> "foo"))() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'key.serializer' is not supported")) - - val ex2 = intercept[IllegalArgumentException] { - createKafkaWriter( - input.toDF(), - withOptions = Map("kafka.value.serializer" -> "foo"))() - } - assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'value.serializer' is not supported")) + assertWrongOption(inputTopic, input.toDF(), Map("kafka.key.serializer" -> "foo"), + "kafka option 'key.serializer' is not supported") + assertWrongOption(inputTopic, input.toDF(), Map("kafka.value.serializer" -> "foo"), + "kafka option 'value.serializer' is not supported") } test("generic - write big data with small producer buffer") { @@ -422,4 +348,48 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { withOptions.foreach(opt => stream.option(opt._1, opt._2)) stream.start() } + + private def runAndVerifyException[T <: Exception : ClassTag]( + inputTopic: String, + expectErrorMsg: String)( + writerFn: => StreamingQuery): Unit = { + var writer: StreamingQuery = null + val ex: Exception = try { + intercept[T] { + writer = writerFn + testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) + eventually(timeout(streamingTimeout)) { + assert(writer.exception.isDefined) + } + throw writer.exception.get + } + } finally { + if (writer != null) writer.stop() + } + val rootException = ex match { + case e: StreamingQueryException => e.getCause.getCause + case e => e + } + assert(rootException.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg)) + } + + private def assertWrongSchema( + inputTopic: String, + input: DataFrame, + selectExpr: Seq[String], + expectErrorMsg: String): Unit = { + runAndVerifyException[AnalysisException](inputTopic, expectErrorMsg) { + createKafkaWriter(input)(withSelectExpr = selectExpr: _*) + } + } + + private def assertWrongOption( + inputTopic: String, + input: DataFrame, + options: Map[String, String], + expectErrorMsg: String): Unit = { + runAndVerifyException[IllegalArgumentException](inputTopic, expectErrorMsg) { + createKafkaWriter(input, withOptions = options)() + } + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index aacb10f5197b..1705d76de758 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -211,38 +211,10 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { val topic = newTopic() testUtils.createTopic(topic) - /* No topic field or topic option */ - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = "value as key", "value" - ) - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage - .toLowerCase(Locale.ROOT) - .contains("topic option required when no 'topic' attribute is present")) - - try { - /* No value field */ - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "value as key" - ) - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "required attribute 'value' not found")) + assertWrongSchema(input, Seq("value as key", "value"), + "topic option required when no 'topic' attribute is present") + assertWrongSchema(input, Seq(s"'$topic' as topic", "value as key"), + "required attribute 'value' not found") } test("streaming - write data with valid schema but wrong types") { @@ -250,109 +222,31 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { val topic = newTopic() testUtils.createTopic(topic) - var writer: StreamingQuery = null - var ex: Exception = null - try { - /* topic field wrong type */ - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"CAST('1' as INT) as topic", "value" - ) - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) - - try { - /* value field wrong type */ - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value" - ) - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "value attribute type must be a string or binary")) - - try { - ex = intercept[StreamingQueryException] { - /* key field wrong type */ - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value" - ) - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "key attribute type must be a string or binary")) - - try { - ex = intercept[StreamingQueryException] { - /* partition field wrong type */ - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "value", "value as partition" - ) - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "partition attribute type must be an int")) + assertWrongSchema(input, Seq("CAST('1' as INT) as topic", "value"), + "topic must be a(n) string") + assertWrongSchema(input, Seq(s"'$topic' as topic", "CAST(value as INT) as value"), + "value must be a(n) string or binary") + assertWrongSchema(input, Seq(s"'$topic' as topic", "CAST(value as INT) as key", "value"), + "key must be a(n) string or binary") + assertWrongSchema(input, Seq(s"'$topic' as topic", "value", "value as partition"), + "partition must be a(n) int") } test("streaming - write to non-existing topic") { val input = MemoryStream[String] - val topic = newTopic() - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))() - input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() - } - } finally { - writer.stop() + runAndVerifyStreamingQueryException(input, "job aborted") { + createKafkaWriter(input.toDF(), withTopic = Some(newTopic()))() } - assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted")) } test("streaming - exception on config serializer") { val input = MemoryStream[String] - var writer: StreamingQuery = null - var ex: Exception = null - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter( - input.toDF(), - withOptions = Map("kafka.key.serializer" -> "foo"))() - input.addData("1") - writer.processAllAvailable() - } - assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'key.serializer' is not supported")) - - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter( - input.toDF(), - withOptions = Map("kafka.value.serializer" -> "foo"))() - input.addData("1") - writer.processAllAvailable() - } - assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'value.serializer' is not supported")) + + assertWrongOption(input, Map("kafka.key.serializer" -> "foo"), + "kafka option 'key.serializer' is not supported") + assertWrongOption(input, Map("kafka.value.serializer" -> "foo"), + "kafka option 'value.serializer' is not supported") } private def createKafkaWriter( @@ -379,6 +273,41 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { } stream.start() } + + private def runAndVerifyStreamingQueryException( + input: MemoryStream[String], + expectErrorMsg: String)( + writerFn: => StreamingQuery): Unit = { + var writer: StreamingQuery = null + val ex: Exception = try { + intercept[StreamingQueryException] { + writer = writerFn + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + if (writer != null) writer.stop() + } + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg)) + } + + private def assertWrongSchema( + input: MemoryStream[String], + selectExpr: Seq[String], + expectErrorMsg: String): Unit = { + runAndVerifyStreamingQueryException(input, expectErrorMsg) { + createKafkaWriter(input.toDF())(withSelectExpr = selectExpr: _*) + } + } + + private def assertWrongOption( + input: MemoryStream[String], + options: Map[String, String], + expectErrorMsg: String): Unit = { + runAndVerifyStreamingQueryException(input, expectErrorMsg) { + createKafkaWriter(input.toDF(), withOptions = options)() + } + } } abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {