From 05f17c6d3f26cc747dd0781b22486b95e1a563cd Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Fri, 28 Apr 2017 07:50:08 -0700 Subject: [PATCH 1/3] fix bug --- .../spark/sql/kafka010/KafkaWriter.scala | 4 ++-- .../spark/sql/kafka010/KafkaSinkSuite.scala | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index a637d52c933a..61936e32fd83 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - val schema = queryExecution.logical.output + val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { throw new AnalysisException(s"topic option required when no " + @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - val schema = queryExecution.logical.output + val schema = queryExecution.analyzed.output validateQuery(queryExecution, kafkaParameters, topic) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { queryExecution.toRdd.foreachPartition { iter => diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 4bd052d249ec..a66594114968 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -26,13 +26,16 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} class KafkaSinkSuite extends StreamTest with SharedSQLContext { + import testImplicits._ protected var testUtils: KafkaTestUtils = _ @@ -108,6 +111,27 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } + test("batch - enforce analyzed plans SPARK-20496") { + val adTypes = Seq("banner") + val eventTypes = Seq("view") + + val inputEvents = + spark.range(1, 1000) + .select(to_json(struct("*")) as 'value) + + + val topic = newTopic() + testUtils.createTopic(topic) + + // for improperly formatted DataFrame + inputEvents.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .save() + + } + test("streaming - write to kafka with topic field") { val input = MemoryStream[String] val topic = newTopic() From 77e25f1bddbeb9345d8589b36c179eaf0d846158 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Fri, 28 Apr 2017 07:53:38 -0700 Subject: [PATCH 2/3] minor cleanup --- .../scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index a66594114968..b53b9009c76c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -112,18 +112,13 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } test("batch - enforce analyzed plans SPARK-20496") { - val adTypes = Seq("banner") - val eventTypes = Seq("view") - val inputEvents = spark.range(1, 1000) .select(to_json(struct("*")) as 'value) - val topic = newTopic() testUtils.createTopic(topic) - // for improperly formatted DataFrame inputEvents.write .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) From 4ad21fecf89318c6e58194bb8909116f60120cb7 Mon Sep 17 00:00:00 2001 From: Bill Chambers Date: Fri, 28 Apr 2017 09:13:33 -0700 Subject: [PATCH 3/3] nits --- .../org/apache/spark/sql/kafka010/KafkaSinkSuite.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index b53b9009c76c..2ab336c7ac47 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -26,7 +26,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ @@ -35,7 +34,6 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} class KafkaSinkSuite extends StreamTest with SharedSQLContext { - import testImplicits._ protected var testUtils: KafkaTestUtils = _ @@ -111,20 +109,19 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } - test("batch - enforce analyzed plans SPARK-20496") { + test("SPARK-20496: batch - enforce analyzed plans") { val inputEvents = spark.range(1, 1000) .select(to_json(struct("*")) as 'value) val topic = newTopic() testUtils.createTopic(topic) - - inputEvents.write + // used to throw UnresolvedException + inputEvents.write .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("topic", topic) .save() - } test("streaming - write to kafka with topic field") {