From 451d61eaf3c837691e9f30cbbf8957c333fe4030 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 24 Feb 2022 13:46:12 -0800 Subject: [PATCH 1/3] send LeaveGroup when thread dies --- .../kafka/streams/processor/internals/StreamThread.java | 5 +++-- .../streams/integration/ErrorHandlingIntegrationTest.java | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3af07bbbd646e..8f14f45783290 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -546,7 +546,8 @@ public void run() { cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); - this.streamsUncaughtExceptionHandler.accept(e, false); + streamsUncaughtExceptionHandler.accept(e, false); + requestLeaveGroupDuringShutdown(); } finally { completeShutdown(cleanRun); } @@ -1249,7 +1250,7 @@ public Optional getGroupInstanceID() { } public void requestLeaveGroupDuringShutdown() { - this.leaveGroupRequested.set(true); + leaveGroupRequested.set(true); } public Map producerMetrics() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java index 40d2cf98027ca..a528ac6960f99 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java @@ -96,8 +96,7 @@ private Properties props() { mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), - mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)) + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)) ); } From 3867b8126197e76cbae63c04511f37d03c2b7659 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 24 Feb 2022 13:51:03 -0800 Subject: [PATCH 2/3] checkstyle --- .../kafka/streams/integration/ErrorHandlingIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java index a528ac6960f99..f889e3757626c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; From f00b4753666166309f23a132c86902503bf329d7 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 24 Feb 2022 13:57:52 -0800 Subject: [PATCH 3/3] move LeaveGroup --- .../apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8f14f45783290..cecacb41eefca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -546,8 +546,8 @@ public void run() { cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); - streamsUncaughtExceptionHandler.accept(e, false); requestLeaveGroupDuringShutdown(); + streamsUncaughtExceptionHandler.accept(e, false); } finally { completeShutdown(cleanRun); }