From a4e904e16c77bd18411e9e311cfa9ce16e66a7be Mon Sep 17 00:00:00 2001 From: Xiaojun Zhao <210xiaojun@163.com> Date: Tue, 2 Jul 2024 16:35:35 +0800 Subject: [PATCH] bugfix: graceful consumer shutdown --- consume.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/consume.go b/consume.go index 8d9b788..3e11b45 100644 --- a/consume.go +++ b/consume.go @@ -84,15 +84,7 @@ func NewConsumer( // Run starts consuming with automatic reconnection handling. Do not reuse the // consumer for anything other than to close it. func (consumer *Consumer) Run(handler Handler) error { - err := consumer.startGoroutines( - handler, - consumer.options, - ) - if err != nil { - return err - } - - handler = func(d Delivery) (action Action) { + handlerWrapper := func(d Delivery) (action Action) { if !consumer.handlerMu.TryRLock() { return NackRequeue } @@ -100,10 +92,18 @@ func (consumer *Consumer) Run(handler Handler) error { return handler(d) } + err := consumer.startGoroutines( + handlerWrapper, + consumer.options, + ) + if err != nil { + return err + } + for err := range consumer.reconnectErrCh { consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( - handler, + handlerWrapper, consumer.options, ) if err != nil {