Skip to content

Commit

Permalink
Fix hypothetical consumer deadlock, attempt 2
Browse files Browse the repository at this point in the history
Fixes bug #475
  • Loading branch information
eapache committed Jul 15, 2015
1 parent c1d582e commit fc66764
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,14 @@ func (child *partitionConsumer) responseFeeder() {
// so shut it down and force the user to choose what to do
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
child.sendError(err)
child.AsyncClose()
child.safeRedispatch(nil)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dying <- err
child.safeRedispatch(err)
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dying <- err
child.safeRedispatch(err)
}

child.broker.acks.Done()
Expand All @@ -426,6 +426,24 @@ func (child *partitionConsumer) responseFeeder() {
close(child.errors)
}

func (child *partitionConsumer) safeRedispatch(err error) {
// This is a weird little function to handle a weird little case. If the user calls AsyncClose
// just before the broker for this partition returns an error, then there will already be a
// value in the buffer for child.dying, and a simple write (the old code) would block. Neither
// the partitionConsumer nor the brokerConsumer will try and actually consume that value until
// the responseFeeder returns, leading to a deadlock.
// Instead, try both writing to and reading from child.dying. If we write successfully (99.9%)
// nothing changes. If we read, then the user must have hit this race, and the right thing to do
// is to just shut down so write a nil. This does leave us open (in theory) to hitting the same
// race on the second unconditional write, but that would require the user to call `AsyncClose`
// twice, which is user error.
select {
case child.dying <- err:
case <-child.dying:
child.dying <- nil
}
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
Expand Down

0 comments on commit fc66764

Please sign in to comment.