From 11034314c00b8cb3287527bf045abbd0c5409495 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Mon, 6 Mar 2023 19:09:12 +0100 Subject: [PATCH] Fix issue in "restartStreamsOnRebalancing mode closes all partition streams" test relating to shutdown. This was also fixed in #591 --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 9029547ab..60ee7dff7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -124,7 +124,7 @@ private[consumer] final class Runloop( s"onRevoked called on rebalance listener with pending assigned event" ) ) - } + }.unlessZIO(isShutdown).unit } } ) @@ -512,7 +512,7 @@ private[consumer] final class Runloop( case r @ Command.ChangeSubscription(_, _, _) => r.succeed.as(state) case _ @Command.Commit(_, cont) => - cont.fail(new KafkaException("Consumer is shutting down")).as(state) + cont.fail(new KafkaException("Consumer is shutting down")).as(state) // TODO can we just allow it? }