From d755739842caf170e1a90e7ffd605123f830fd50 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 12 Feb 2017 17:55:41 +0800 Subject: [PATCH 1/3] Fix --- .../apache/spark/sql/kafka010/KafkaOffsetReader.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 6b2fb3c11255..2696d6f089d2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -64,6 +64,13 @@ private[kafka010] class KafkaOffsetReader( }) val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread) + /** + * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is + * created -- see SPARK-19564. + */ + private var groupId: String = null + private var nextId = 0 + /** * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the * offsets and never commits them. @@ -76,10 +83,6 @@ private[kafka010] class KafkaOffsetReader( private val offsetFetchAttemptIntervalMs = readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong - private var groupId: String = null - - private var nextId = 0 - private def nextGroupId(): String = { groupId = driverGroupIdPrefix + "-" + nextId nextId += 1 From 0f19af3b6b2fdc8f527f952bbf65681755c3f6b4 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 12 Feb 2017 21:40:37 +0800 Subject: [PATCH 2/3] Run 100 times # This is the 1st commit message: Run 100 times # This is the commit message #2: Revert "Run 100 times" --- dev/run-tests | 4 +++- .../org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 257d1e8d50bb..9aa2e1af41ae 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -20,4 +20,6 @@ FWDIR="$(cd "`dirname $0`"/..; pwd)" cd "$FWDIR" -exec python -u ./dev/run-tests.py "$@" +build/sbt -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 "project sql-kafka-0-10" "test-only *KafkaSourceSuite*" + +# exec python -u ./dev/run-tests.py "$@" diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 211c8a5e73e4..7bfaa1c48183 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -311,7 +311,8 @@ class KafkaSourceSuite extends KafkaSourceTest { } } - test("subscribing topic by pattern with topic deletions") { + for (idx <- 1 to 100) + test(s"subscribing topic by pattern with topic deletions $idx") { val topicPrefix = newTopic() val topic = topicPrefix + "-seems" val topic2 = topicPrefix + "-bad" From cda7b8f46317ca0155e3bdc0cf9e8f81451d6191 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 12 Feb 2017 23:14:47 +0800 Subject: [PATCH 3/3] Revert "Run 100 times" This reverts commit 0f19af3b6b2fdc8f527f952bbf65681755c3f6b4. --- dev/run-tests | 4 +--- .../org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 9aa2e1af41ae..257d1e8d50bb 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -20,6 +20,4 @@ FWDIR="$(cd "`dirname $0`"/..; pwd)" cd "$FWDIR" -build/sbt -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 "project sql-kafka-0-10" "test-only *KafkaSourceSuite*" - -# exec python -u ./dev/run-tests.py "$@" +exec python -u ./dev/run-tests.py "$@" diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 7bfaa1c48183..211c8a5e73e4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -311,8 +311,7 @@ class KafkaSourceSuite extends KafkaSourceTest { } } - for (idx <- 1 to 100) - test(s"subscribing topic by pattern with topic deletions $idx") { + test("subscribing topic by pattern with topic deletions") { val topicPrefix = newTopic() val topic = topicPrefix + "-seems" val topic2 = topicPrefix + "-bad"