Skip to content

Commit

Permalink
Merge pull request #529 from Shopify/fix-consumer-race
Browse files Browse the repository at this point in the history
Fix consumer race panic on close
  • Loading branch information
eapache committed Sep 1, 2015
2 parents 60a3ef1 + cdd80eb commit 9c6285c
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,7 @@ func (bc *brokerConsumer) subscriptionConsumer() {

// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}
bc.updateSubscriptions(newSubscriptions)

if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
Expand All @@ -601,46 +598,57 @@ func (bc *brokerConsumer) subscriptionConsumer() {
}
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

for child := range bc.subscriptions {
select {
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
break
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(result)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(result)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
}
break
}
}
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
break
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(result)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(result)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
}
}
}
Expand Down

0 comments on commit 9c6285c

Please sign in to comment.