Skip to content

Commit

Permalink
Remove external calls to disconnectBroker
Browse files Browse the repository at this point in the history
It is now only called from one place in the client. This looks simple but is
actually super-subtle, and depends on lazy broker connections (PR #309).

disconnectBroker does a whole bunch of different things:
- calls `Close` on the broker connection
- adds the address to the internal `deadBrokerAddrs` map even if the broker is
  not a seed, which I think is wrong, since resurrectDeadBrokers will use it to
  repopulate the seedBrokerAddrs list
- rotates seedBrokers (if the broker was a seed broker)
- removes it from the brokers map (otherwise)

In the producer and consumer where we used to call disconnectBroker:
- We now call `Close` directly on the broker.
- The broker we are dealing with is not a seed broker, so the seedBrokers do not
  need rotating, and I don't think it's a problem that it no longer gets added
  to `deadBrokerAddrs`.
- The reason we removed it from the broker map was so that the next request for
  that broker would trigger a metadata request and reopen the connection. The
  producer and consumer both manually trigger metadata requests when necessary,
  and the fact that we now have lazy connection opening means simply closing it
  (which we do, see first bullet) is enough to cause the connection to reopen
  the next time it is requested, even if no metadata refresh is requested.
  • Loading branch information
eapache committed Mar 12, 2015
1 parent f7f2816 commit ae411cd
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 3 deletions.
3 changes: 1 addition & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
response, err := w.fetchNewMessages()

if err != nil {
Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", w.broker.addr, err)
Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err)
w.abort(err)
return
}
Expand Down Expand Up @@ -475,7 +475,6 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo

func (w *brokerConsumer) abort(err error) {
_ = w.broker.Close() // we don't care about the error this might return, we already have one
w.consumer.client.disconnectBroker(w.broker)

for child := range w.subscriptions {
child.sendError(err)
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
p.returnErrors(batch, err)
continue
default:
p.client.disconnectBroker(broker)
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
closing = err
_ = broker.Close()
p.retryMessages(batch, err)
continue
}
Expand Down

0 comments on commit ae411cd

Please sign in to comment.