Skip to content

Commit

Permalink
Merge pull request #338 from Shopify/die-disconnect-broker-die
Browse files Browse the repository at this point in the history
Remove external calls to disconnectBroker
  • Loading branch information
wvanbergen committed Mar 13, 2015
2 parents 401d5b1 + a778a8e commit 20f98a6
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 3 deletions.
3 changes: 1 addition & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
response, err := w.fetchNewMessages()

if err != nil {
Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", w.broker.addr, err)
Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err)
w.abort(err)
return
}
Expand Down Expand Up @@ -475,7 +475,6 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo

func (w *brokerConsumer) abort(err error) {
_ = w.broker.Close() // we don't care about the error this might return, we already have one
w.consumer.client.disconnectBroker(w.broker)

for child := range w.subscriptions {
child.sendError(err)
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
p.returnErrors(batch, err)
continue
default:
p.client.disconnectBroker(broker)
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
closing = err
_ = broker.Close()
p.retryMessages(batch, err)
continue
}
Expand Down

0 comments on commit 20f98a6

Please sign in to comment.