Skip to content

Commit

Permalink
Merge pull request #451 from Shopify/wait-group-shutdown-error
Browse files Browse the repository at this point in the history
Fix another hypothetical wait group issue
  • Loading branch information
eapache committed May 20, 2015
2 parents c0f48c6 + 6b85c47 commit e8ad5e2
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,18 @@ func (p *asyncProducer) topicDispatcher() {
p.inFlight.Done()
continue
} else if msg.retries == 0 {
p.inFlight.Add(1)
if shuttingDown {
p.returnError(msg, ErrShuttingDown)
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
if p.conf.Producer.Return.Errors {
p.errors <- pErr
} else {
Logger.Println(pErr)
}
continue
}
p.inFlight.Add(1)
}

if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
Expand Down

0 comments on commit e8ad5e2

Please sign in to comment.