Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
val schema = queryExecution.analyzed.output
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic == None) {
throw new AnalysisException(s"topic option required when no " +
Expand Down Expand Up @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
val schema = queryExecution.analyzed.output
validateQuery(queryExecution, kafkaParameters, topic)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
queryExecution.toRdd.foreachPartition { iter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BinaryType, DataType}
Expand Down Expand Up @@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
s"save mode overwrite not allowed for kafka"))
}

test("SPARK-20496: batch - enforce analyzed plans") {
val inputEvents =
spark.range(1, 1000)
.select(to_json(struct("*")) as 'value)

val topic = newTopic()
testUtils.createTopic(topic)
// used to throw UnresolvedException
inputEvents.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
}

test("streaming - write to kafka with topic field") {
val input = MemoryStream[String]
val topic = newTopic()
Expand Down