From 5ad52d84076689d953246eb3fb2ba4ca16566695 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 9 Jun 2015 10:51:33 -0400 Subject: [PATCH] Don't retry messages until the broker is closed Otherwise there is a case where the retried messages can have their newly-selected broker closed out from under them if the remainder of this goroutine gets heavily delayed by the scheduler. I believe this may be the cause of the flaky failure in TestAsyncProducerBrokerBounce (see e.g. https://travis-ci.org/Shopify/sarama/jobs/66053366) --- async_producer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_producer.go b/async_producer.go index aced88631..e4af98888 100644 --- a/async_producer.go +++ b/async_producer.go @@ -536,9 +536,9 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err) p.abandonBrokerConnection(broker) - p.retryMessages(batch, err) _ = broker.Close() closing = err + p.retryMessages(batch, err) continue }