From ae411cd9089d530446b72c2d9e67c229c459e95a Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 12 Mar 2015 17:01:23 +0000 Subject: [PATCH] Remove external calls to disconnectBroker It is now only called from one place in the client. This looks simple but is actually super-subtle, and depends on lazy broker connections (PR #309). disconnectBroker does a whole bunch of different things: - calls `Close` on the broker connection - adds the address to the internal `deadBrokerAddrs` map even if the broker is not a seed, which I think is wrong, since resurrectDeadBrokers will use it to repopulate the seedBrokerAddrs list - rotates seedBrokers (if the broker was a seed broker) - removes it from the brokers map (otherwise) In the producer and consumer where we used to call disconnectBroker: - We now call `Close` directly on the broker. - The broker we are dealing with is not a seed broker, so the seedBrokers do not need rotating, and I don't think it's a problem that it no longer gets added to `deadBrokerAddrs`. - The reason we removed it from the broker map was so that the next request for that broker would trigger a metadata request and reopen the connection. The producer and consumer both manually trigger metadata requests when necessary, and the fact that we now have lazy connection opening means simply closing it (which we do, see first bullet) is enough to cause the connection to reopen the next time it is requested, even if no metadata refresh is requested. --- consumer.go | 3 +-- producer.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index f79f16ae4a..09d8dab941 100644 --- a/consumer.go +++ b/consumer.go @@ -438,7 +438,7 @@ func (w *brokerConsumer) subscriptionConsumer() { response, err := w.fetchNewMessages() if err != nil { - Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", w.broker.addr, err) + Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err) w.abort(err) return } @@ -475,7 +475,6 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo func (w *brokerConsumer) abort(err error) { _ = w.broker.Close() // we don't care about the error this might return, we already have one - w.consumer.client.disconnectBroker(w.broker) for child := range w.subscriptions { child.sendError(err) diff --git a/producer.go b/producer.go index 1dfad6ba92..94483e324f 100644 --- a/producer.go +++ b/producer.go @@ -532,9 +532,9 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) { p.returnErrors(batch, err) continue default: - p.client.disconnectBroker(broker) Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err) closing = err + _ = broker.Close() p.retryMessages(batch, err) continue }