Skip to content

Commit 2bdbc87

Browse files
lw-linzsxwing
authored andcommitted
[SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group
## What changes were proposed in this pull request? In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new consumer. In our current implementation, the first consumer and the second consumer would be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers not being in the same group._** The cause is that, in our current implementation, the first consumer is created before `groupId` and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased during the creation of that first consumer, `groupId` and `nextId` would still be initialized to default values in the constructor for the second consumer. We should make sure that `groupId` and `nextId` are initialized before any consumer is created. ## How was this patch tested? Ran 100 times of `KafkaSourceSuite`; all passed Author: Liwei Lin <lwlin7@gmail.com> Closes #16902 from lw-lin/SPARK-19564-.
1 parent bc0a0e6 commit 2bdbc87

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ private[kafka010] class KafkaOffsetReader(
6464
})
6565
val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
6666

67+
/**
68+
* Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is
69+
* created -- see SPARK-19564.
70+
*/
71+
private var groupId: String = null
72+
private var nextId = 0
73+
6774
/**
6875
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
6976
* offsets and never commits them.
@@ -76,10 +83,6 @@ private[kafka010] class KafkaOffsetReader(
7683
private val offsetFetchAttemptIntervalMs =
7784
readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
7885

79-
private var groupId: String = null
80-
81-
private var nextId = 0
82-
8386
private def nextGroupId(): String = {
8487
groupId = driverGroupIdPrefix + "-" + nextId
8588
nextId += 1

0 commit comments

Comments
 (0)