-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rebalancing issues with slow consumers #1608
Comments
Regarding 1, we've encountered a similar problem in a fairly common use case: While most of the consumed items should be processed really fast, it is a fair assumption that some of them might fail due to throttling/network issues within the remote DB. In such a case, handling those failures using an exponential backoff retry is not possible as it might take more than |
I've come across this exact same problem. I'm building a consumer that will write records from a topic to a database, so the order in which messages are processed are really important. Since it depends on an external service, there is a function that retries failures with an exponential backoff. But I was able to create scenario where, because of timeouts, the ordering guarantees were lost and the final state ended up inconsistent: Scenario:
W1 eventually stops, since it lost its partitions, but because the heartbeat stopped, W2 processed the same data as W1, which led to an inconsistent sate. If the heartbeat hadn't stopped, W2 wouldn't have been able to join until Of course, the first issue (Consumers must deplete Messages() before they can be shut down) is still problematic. Since there is no way of knowing when the consumer lost it's partitions, it is impossible to know when it should stop trying to insert the data in the database. |
If anyone is in a dire state, we maintain a fork containing a branch with fixes to all the above here: https://github.com/iguazio/sarama/tree/slow-consumer (it's synced to 1.25.0 - fix commit is iguazio@c85539a). It has been running in production for a while. As the original issue states, we're still waiting for a dev to comment/approve before we invest the effort in a PR. |
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. |
We've come across this exact same problem in sarama@1.27.2 and your fork (iguazio/sarama@1-27-0-slow-consumer) @pavius helps to get the problem solved for now. The exact steps to reproduce our case were:
Example using claim.StopConsuming chan: func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
fmt.Println("starting a consumeClaim for ", session.MemberID())
defer fmt.Println("consumeClaim finish for ", session.MemberID())
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
wait := make(chan struct{}, 1)
m := Message{
session: session,
consumerMsg: msg,
wait: wait,
}
fmt.Println("received message", session.MemberID(), string(m.Key))
select {
case c.messages <- m: // we are processing outside, using a non buffered chan
fmt.Println("processing...", session.MemberID(), string(m.Key))
<-wait
fmt.Println("finish msg", session.MemberID(), string(m.Key))
case <-claim.StopConsuming():
fmt.Println("stopping by stopConsuming - inside", session.MemberID(), string(m.Key))
return nil
}
fmt.Println("end of process message", session.MemberID(), string(m.Key))
case <-claim.StopConsuming():
fmt.Println("stopping by stopConsuming - outside", session.MemberID())
return nil
}
}
} Note: func ConfigsByProcessingTime(maxProcessingTime time.Duration) *sarama.Config {
conf := sarama.NewConfig()
conf.Version = sarama.V2_6_0_0
conf.Net.MaxOpenRequests = 5
conf.Consumer.Return.Errors = true
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
const partitions = 10
conf.ChannelBufferSize = 0
conf.Consumer.MaxProcessingTime = maxProcessingTime * partitions
conf.Consumer.Group.Rebalance.Timeout = (maxProcessingTime * partitions) + 5*time.Second
conf.Net.ReadTimeout = (maxProcessingTime * partitions) + 5*time.Second
conf.Net.WriteTimeout = (maxProcessingTime * partitions) + 5*time.Second
conf.Net.DialTimeout = (maxProcessingTime * partitions) + 5*time.Second
return conf
} |
Hey guys, I just ran into the same problem and I thought you can check sess.Context().Done() to known if you should stop the consumer func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
wait := make(chan error)
for msg := range claim.Messages() {
var m broker.Message
if err := h.kopts.Codec.Unmarshal(msg.Value, &m); err != nil {
continue
}
go func() {
err := h.handler(&publication{
m: &m,
t: msg.Topic,
km: msg,
cg: h.cg,
sess: sess,
})
select {
case <-sess.Context().Done():
case wait <- err:
}
}()
select {
case <-sess.Context().Done():
break
case err := <-wait:
if err == nil && h.subopts.AutoAck {
sess.MarkMessage(msg, "")
}
}
}
return nil
} |
@pavius thanks for this excellent write up. For some reason I hadn't been aware of this issue until I stumbled upon it just now. I've been browsing over your changes via the old nuclio/nuclio vendor dir and your iguazio/sarama fork and I think they look reasonable. If you'd be willing to put together separate PRs for the three cases you identified that would be great. Particularly point 3. seems a quick and easy bugfix that we should get merged sooner rather than later, and your proposed change for 2. look reasonable to me, although I'd like to exercise it with some unittesting and sustained load somewhere myself to be sure. |
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897
Sarama would immediately stop heartbeating on rebalance. If the session teardown took longer than the session timeout, it would get evicted from the group. This can be fixed by ensuring that heartbeats only stop _after_ the session has completed its cleanup process. Contributes-to: #1608
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: #1608 #1897 Co-authored-by: Dominic Evans <dominic.evans@uk.ibm.com>
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: IBM#1608 IBM#1897 Co-authored-by: Dominic Evans <dominic.evans@uk.ibm.com>
Is there any new update for issue 1? |
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. |
The suggested changes were merged into Sarama. Closing this issue as resolved |
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Problem Description
We've identified three issues that affect rebalancing when the consumer takes tens of seconds or minutes to process an event.
1: Consumers must deplete Messages() before they can be shut down
Sarama asks for the following:
This is a reasonable request since the Sarama waits for all
ConsumeClaim()
s to return before re-establishing a session and joining the new generation of the consumer group. However, it communicates this byclose()
ing the bufferedMessages
channel - not something the consumer can act upon immediately, and even worse so without first reading all the messages in the channel.If it takes 3 minutes to handle a single message (which is a viable scenario) the consumers have to set their
ChannelBufferSize
to 1 and theirRebalance.Timeout
to 6 minutes + some slack (since the consumer will at worst have to handle the message it is handling and the one in the channel.Proposed solution
Add:
To
ConsumerGroupClaim
. A consumer can then run their message handling in a background go routine and wait for completion of handling orStopConsuming
. They can then decide whether to terminate processing immediately, wait a grace period or complete the current processing of the event (without having to deplete the Messages channel to understand that it should exit).2: The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time)
Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in
subscriptionManager
is faulty, perhaps assuming thatcase:
order prioritizes whichcase
should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements):For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the
bc.input
channel. After an iteration there is a race betweencase event, ok := <-bc.input
which will batch the request andcase bc.newSubscriptions <- buffer
which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch.This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours.
Proposed solution
Replace the
subscriptionManager
logic with a delay based one here:https://github.com/nuclio/nuclio/blob/1.1.x/vendor/github.com/Shopify/sarama/consumer.go#L741
3: Heartbeat timer is stopped once rebalance is detected, causing session to time out if tear down is long
The heartbeat loop exits immediately when a rebalance error is returned:
https://github.com/Shopify/sarama/blob/master/consumer_group.go#L760
If the consumers take a long time to tear down, the member will be ejected from the group due to session timeout.
Proposed solution
Stop sending heartbeats only AFTER the session has completed tearing down (I think this was the intent).
Summary
We can contribute fixes to these issues above if the solutions described here are agreed upon (we're currently running them through tests).
The text was updated successfully, but these errors were encountered: