From 375e860a2d9dbe8d3791f192d79311b92b515326 Mon Sep 17 00:00:00 2001 From: Dima Date: Thu, 12 Dec 2024 10:35:21 +0100 Subject: [PATCH 1/2] Fix KafkaMicroBatchSourceSuite to cover KafkaOffsetReaderConsumer with unit tests --- .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 1d119de43970f..764ead7217413 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1612,6 +1612,7 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { spark.conf.set( SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key, classOf[KafkaSourceProvider].getCanonicalName) + spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true") } test("V1 Source is used when disabled through SQLConf") { @@ -1639,6 +1640,11 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true") + } + test("V2 Source is used by default") { val topic = newTopic() testUtils.createTopic(topic, partitions = 5) From 80862416eb81ce7b9e040c0342a31ee967426635 Mon Sep 17 00:00:00 2001 From: Dima Date: Fri, 13 Dec 2024 15:13:37 +0100 Subject: [PATCH 2/2] Address @HeartSaVioR comments --- .../kafka010/KafkaMicroBatchSourceSuite.scala | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 764ead7217413..22eeae97874b1 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1591,28 +1591,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with } } - -class KafkaMicroBatchV1SourceWithAdminSuite extends KafkaMicroBatchV1SourceSuite { - override def beforeAll(): Unit = { - super.beforeAll() - spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false") - } -} - -class KafkaMicroBatchV2SourceWithAdminSuite extends KafkaMicroBatchV2SourceSuite { - override def beforeAll(): Unit = { - super.beforeAll() - spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false") - } -} - -class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { +abstract class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { override def beforeAll(): Unit = { super.beforeAll() spark.conf.set( SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key, classOf[KafkaSourceProvider].getCanonicalName) - spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true") } test("V1 Source is used when disabled through SQLConf") { @@ -1638,12 +1622,7 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { } } -class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { - - override def beforeAll(): Unit = { - super.beforeAll() - spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true") - } +abstract class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { test("V2 Source is used by default") { val topic = newTopic() @@ -1876,6 +1855,35 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { } } +class KafkaMicroBatchV1SourceWithAdminSuite extends KafkaMicroBatchV1SourceSuite { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false") + } +} + +class KafkaMicroBatchV1SourceWithConsumerSuite extends KafkaMicroBatchV1SourceSuite { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true") + } +} + +class KafkaMicroBatchV2SourceWithAdminSuite extends KafkaMicroBatchV2SourceSuite { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "false") + } +} + +class KafkaMicroBatchV2SourceWithConsumerSuite extends KafkaMicroBatchV2SourceSuite { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING.key, "true") + } +} + + abstract class KafkaSourceSuiteBase extends KafkaSourceTest { import testImplicits._