diff --git a/async_producer.go b/async_producer.go index aced88631..e4af98888 100644 --- a/async_producer.go +++ b/async_producer.go @@ -536,9 +536,9 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err) p.abandonBrokerConnection(broker) - p.retryMessages(batch, err) _ = broker.Close() closing = err + p.retryMessages(batch, err) continue }