Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"fc415a71-f0a2-4c3c-aeaf-f9e258c3f726"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1568508285207,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}}
{"spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62":{"2":3,"4":3,"1":3,"3":3,"0":3}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random

import org.apache.commons.io.FileUtils
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
Expand All @@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with KafkaTest {

Expand Down Expand Up @@ -1162,6 +1164,62 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
}

test("default config of includeHeader doesn't break existing query from Spark 2.4") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this test. 😄

import testImplicits._

// This topic name is migrated from Spark 2.4.3 test run
val topic = "spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62"
// create same topic and messages as test run
testUtils.createTopic(topic, partitions = 5, overwrite = true)
testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)

(31 to 35).map { num =>
(num - 31, (num.toString, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))))
}.foreach { rec => testUtils.sendMessage(topic, rec._2, Some(rec._1)) }

val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribePattern", topic)
.option("startingOffsets", "earliest")
.load()

val query = kafka.dropDuplicates()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map(kv => kv._2.toInt + 1)

val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/").toURI

val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)

testStream(query)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
/*
Note: The checkpoint was generated using the following input in Spark version 2.4.3
testUtils.createTopic(topic, partitions = 5, overwrite = true)

testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
*/
makeSureGetOffsetCalled,
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
}

abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
Expand Down Expand Up @@ -1414,7 +1472,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
val now = System.currentTimeMillis()
val topic = newTopic()
testUtils.createTopic(newTopic(), partitions = 1)
testUtils.sendMessages(topic, Array(1).map(_.toString))
testUtils.sendMessage(
topic, ("1", Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))), None
)

val kafka = spark
.readStream
Expand All @@ -1423,6 +1483,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
.option("kafka.metadata.max.age.ms", "1")
.option("startingOffsets", s"earliest")
.option("subscribe", topic)
.option("includeHeaders", "true")
.load()

val query = kafka
Expand All @@ -1445,6 +1506,21 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
// producer. So here we just use a low bound to make sure the internal conversion works.
assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")

def checkHeader(row: Row, expected: Seq[(String, Array[Byte])]): Unit = {
// array<struct<key:string,value:binary>>
val headers = row.getList[Row](row.fieldIndex("headers")).asScala
assert(headers.length === expected.length)

(0 until expected.length).foreach { idx =>
val key = headers(idx).getAs[String]("key")
val value = headers(idx).getAs[Array[Byte]]("value")
assert(key === expected(idx)._1)
assert(value === expected(idx)._2)
}
}

checkHeader(row, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8))))
query.stop()
}

Expand Down