From f2ad547aff257bac2391a8ecf9fddaf04c0ad7b3 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sun, 10 Dec 2023 16:20:46 +0300 Subject: [PATCH] fix graceful shutdown --- batch_consumer.go | 2 ++ consumer.go | 2 ++ consumer_base.go | 4 +--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 528384b..a650ad5 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -76,6 +76,8 @@ func (b *batchConsumer) startBatch() { b.consume(&messages, &commitMessages) case msg, ok := <-b.incomingMessageStream: if !ok { + close(b.batchConsumingStream) + close(b.messageProcessedStream) return } diff --git a/consumer.go b/consumer.go index 08bd9e3..0825e6e 100644 --- a/consumer.go +++ b/consumer.go @@ -68,6 +68,8 @@ func (c *consumer) startBatch() { c.consume(&messages, &commitMessages) case msg, ok := <-c.incomingMessageStream: if !ok { + close(c.singleConsumingStream) + close(c.messageProcessedStream) return } diff --git a/consumer_base.go b/consumer_base.go index 74c316d..4b7c935 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -160,10 +160,8 @@ func (c *base) Stop() error { c.cancelFn() c.quit <- struct{}{} close(c.incomingMessageStream) - close(c.singleConsumingStream) - close(c.batchConsumingStream) - close(c.messageProcessedStream) c.wg.Wait() + err = c.r.Close() })