diff --git a/consume.go b/consume.go index 8d9b788..3e11b45 100644 --- a/consume.go +++ b/consume.go @@ -84,15 +84,7 @@ func NewConsumer( // Run starts consuming with automatic reconnection handling. Do not reuse the // consumer for anything other than to close it. func (consumer *Consumer) Run(handler Handler) error { - err := consumer.startGoroutines( - handler, - consumer.options, - ) - if err != nil { - return err - } - - handler = func(d Delivery) (action Action) { + handlerWrapper := func(d Delivery) (action Action) { if !consumer.handlerMu.TryRLock() { return NackRequeue } @@ -100,10 +92,18 @@ func (consumer *Consumer) Run(handler Handler) error { return handler(d) } + err := consumer.startGoroutines( + handlerWrapper, + consumer.options, + ) + if err != nil { + return err + } + for err := range consumer.reconnectErrCh { consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( - handler, + handlerWrapper, consumer.options, ) if err != nil {