-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix sarama consumer deadlock #2587
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,10 +50,11 @@ type Consumer struct { | |
partitionMapLock sync.Mutex | ||
partitionsHeld int64 | ||
partitionsHeldGauge metrics.Gauge | ||
|
||
doneWg sync.WaitGroup | ||
} | ||
|
||
type consumerState struct { | ||
wg sync.WaitGroup | ||
partitionConsumer sc.PartitionConsumer | ||
} | ||
|
||
|
@@ -78,17 +79,11 @@ func (c *Consumer) Start() { | |
c.logger.Info("Starting main loop") | ||
for pc := range c.internalConsumer.Partitions() { | ||
c.partitionMapLock.Lock() | ||
if p, ok := c.partitionIDToState[pc.Partition()]; ok { | ||
// This is a guard against simultaneously draining messages | ||
// from the last time the partition was assigned and | ||
// processing new messages for the same partition, which may lead | ||
// to the cleanup process not completing | ||
p.wg.Wait() | ||
} | ||
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc} | ||
c.partitionIDToState[pc.Partition()].wg.Add(2) | ||
c.partitionMapLock.Unlock() | ||
c.partitionMetrics(pc.Partition()).startCounter.Inc(1) | ||
|
||
c.doneWg.Add(2) | ||
go c.handleMessages(pc) | ||
go c.handleErrors(pc.Partition(), pc.Errors()) | ||
} | ||
|
@@ -97,31 +92,33 @@ func (c *Consumer) Start() { | |
|
||
// Close closes the Consumer and underlying sarama consumer | ||
func (c *Consumer) Close() error { | ||
c.partitionMapLock.Lock() | ||
for _, p := range c.partitionIDToState { | ||
c.closePartition(p.partitionConsumer) | ||
p.wg.Wait() | ||
} | ||
c.partitionMapLock.Unlock() | ||
c.deadlockDetector.close() | ||
// Close the internal consumer, which will close each partition consumers' message and error channels. | ||
c.logger.Info("Closing parent consumer") | ||
return c.internalConsumer.Close() | ||
err := c.internalConsumer.Close() | ||
|
||
c.logger.Debug("Closing deadlock detector") | ||
c.deadlockDetector.close() | ||
|
||
c.logger.Debug("Waiting for messages and errors to be handled") | ||
c.doneWg.Wait() | ||
|
||
return err | ||
} | ||
|
||
// handleMessages handles incoming Kafka messages on a channel | ||
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { | ||
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) | ||
c.partitionMapLock.Lock() | ||
c.partitionsHeld++ | ||
c.partitionsHeldGauge.Update(c.partitionsHeld) | ||
wg := &c.partitionIDToState[pc.Partition()].wg | ||
c.partitionMapLock.Unlock() | ||
defer func() { | ||
c.closePartition(pc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, I think you're right, @vprithvi. 👍 However, We could instead replace |
||
wg.Done() | ||
c.partitionMapLock.Lock() | ||
c.partitionsHeld-- | ||
c.partitionsHeldGauge.Update(c.partitionsHeld) | ||
c.partitionMapLock.Unlock() | ||
c.doneWg.Done() | ||
}() | ||
|
||
msgMetrics := c.newMsgMetrics(pc.Partition()) | ||
|
@@ -165,12 +162,10 @@ func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { | |
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) | ||
} | ||
|
||
// handleErrors handles incoming Kafka consumer errors on a channel | ||
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { | ||
c.logger.Info("Starting error handler", zap.Int32("partition", partition)) | ||
c.partitionMapLock.Lock() | ||
wg := &c.partitionIDToState[partition].wg | ||
c.partitionMapLock.Unlock() | ||
defer wg.Done() | ||
defer c.doneWg.Done() | ||
|
||
errMetrics := c.newErrMetrics(partition) | ||
for err := range errChan { | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is safe to remove? From the comment is seems like a case where a partition gets re-assigned to the same service instance, so there's a potentially another partition consumer for this partition ID already running (or more likely shutting down).
@vprithvi do you have any more color to add to this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We noticed the case that you described a couple of times, but weren't able to reproduce it in tests.
Since then, many things have changed with both Kafka and Sarama. I think we can remove this now in the interest of simplifying and revisit if see problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did it manifest when it happened? Committed offsets going backwards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this happens when new messages arrive on the partition (because the
Messages()
channel isn't closed yet) after the partition consumer was closed (via thec.closePartition(p.partitionConsumer)
call), causing a new partition consumer to be created.This PR proposes to solve this by calling
c.internalConsumer.Close()
first withinClose()
which closes theMessages()
channel, preventing new messages from arriving, and then closes thePartitions()
channel, preventing new PartitionConsumers from being created.