Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix assign followed by fast revoke during rebalance #1294

Merged

Conversation

ytalashko
Copy link
Contributor

@ytalashko ytalashko commented Jul 31, 2024

When an assign and revoke for the same partition follow each other very quickly, in the same poll, right now we disregard the revoke and incorrectly start a stream for the partition. Initially this is not a problem because the stream will simply not get any records. However, when later the partition is assigned again, a second stream is started for that partition, and both streams will receive all the records of the partition, leading to duplicated processing.

With this change, an assigned and immediately revoked partition will be ignored (no stream is started, the partition will not be counted in metrics and it will not be reported in the diagnostics callback). The same change is made for an assign followed by a 'lost'.

Another implementation has been considered: in the rebalance listener maintain a list of assign/revoke/lost events instead of sets with assigned/revoked/lost partitions. However, this is a bigger change and the only immediate benefit is that we can correctly report the number of assigned/revoked/lost partitions in the metrics.

@ytalashko ytalashko requested a review from erikvanoosten as a code owner July 31, 2024 17:37
@ytalashko
Copy link
Contributor Author

ytalashko commented Aug 2, 2024

Hey there, @svroonland, @guizmaii,
Not sure you are the right ones to ask, but I am just tagging you here, in case anyone can take a look at this MR.
Sorry for bothering or in case you are not the right persons.
Overall, I just want to have a version with this fix, as the bug really brings troubles for a project I'm currently working on.
For sure, I can create my own custom build with this fix, but I would like not to, and use a public version.
Thanks a lot in advance!

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Aug 2, 2024

Most maintainers might be on holidays (I am). I recently rewrote this part so I'd like to look at it when I am back.

@ytalashko
Copy link
Contributor Author

Most maintainers might be on holidays (I am). I recently rewrote this part so I'd like to look at it when I am back.

Got it, @erikvanoosten sounds good, thanks for that and for letting me know.
Have a great holidays!

Copy link
Collaborator

@svroonland svroonland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a while, but I can see now when this would happen (exactly as you describe). LGTM.

I'll leave merging up to Erik as he wanted to have a look as well.

@erikvanoosten
Copy link
Collaborator

Hi @ytalashko . Thanks again for this PR. I am wondering, is this a theoretical issue, or did you see this happening in practice? (Note, IMHO it should be fixed either way.)

Although the proposed change will work, it goes against the (never documented) idea that the rebalance listener should be as simple and short as possible and not take any decisions unless absolutely necessary. Another downside of this approach is that logging, the diagnostics callbacks and metrics will be incorrect.

I will propose another change shortly.

erikvanoosten added a commit that referenced this pull request Aug 14, 2024
As discovered by @ytalashko (see #1294), it is possible that a partition is assigned and immediately revoked in the same poll. No stream should be started for these partitions.

Unlike #1294, this PR does _not_ hide the assigned+revoked partitions from the diagnostic events and rebalance metrics. In addition, this PR also supports the unlikely event of a assigned+lost partition.
@ytalashko
Copy link
Contributor Author

ytalashko commented Aug 14, 2024

Hey, @erikvanoosten,

is this a theoretical issue, or did you see this happening in practice?

It is practical, as I mentioned in my previous comment as the bug really brings troubles for a project I'm currently working on.

it goes against the (never documented) idea that the rebalance listener should be as simple and short as possible and not take any decisions unless absolutely necessary.

Could you, please, give the guidance, where I can get this info, cause I never heard of it? I thought it is for triggering custom actions when the set of partitions assigned or revoked, as stated in the documentation for the rebalance listener.
Also, this line assignedTps = assignedTps -- revoked is on the same level of changes as for example revokedTps = revokedTps ++ revoked and other similar lines, it just aggregates rebalance events from kafka into RebalanceEvent instance defined in the library, which itself means it does not breaks any new ideas.

Also, I implemented it this way, 'cause it seems to be the right place from the perspective of responsibility segregation and simplicity. It is the same idea as @svroonland mentioned in #1298 (comment), RebalanceEvent (as I understand) represents aggregated state of rebalancing happened between the last handling of RebalanceEvent and "now". The aggregation of RebalanceEvent happens in the ConsumerRebalanceListener, and this scenario of fast assign-revoke is part of RebalanceEvent aggregation to me.
With this said, I believe current approach in this PR is following currently implemented approach in the lib.
Also, as mentioned it gives the simplest fix to the issue, which is seems like a good thing to me, to have a simpler solution, unless we need to have a more complex one.
Otherwise, if the lib wanna be transparent about all rebalances and revokes, it should omit even line like assignedTps = assignedTps ++ assigned, 'cause it can hide same partition(s) being assigned (and revoked) multiple times. This way, #1294 (comment) contradicts to the current lib implementation, as per me.
Maybe we can get thoughts of others, to figure out the best approach, wdyt?

Another downside of this approach is that logging, the diagnostics callbacks and metrics will be incorrect.
I will propose another change shortly.

As noted in this comment above, the lib currently is not transparrent to metrics with lines like assignedTps = assignedTps ++ assigned and revokedTps = revokedTps ++ revoked. If you want to be transparent, let me introduce it into metrics, even though, at the moment, I think the best fix is in this closed PR (but Im open to the other reasonable ideas).
Also, I would really like to fix this issue, and I could introduce the best solution we can come up with, wdyt? Not sure why to instantly close this PR, and opening another one, instead of figuring out the best solution and letting me introduce it, as the one who found the bug and proposed fix, I would love that. It is what I saw as a nice collaboration on an open source project, especially from maintainers. It is not some new feature (to instantly reject it), but a fix to a somewhat major bug I'd say. In any way, why not to discuss to come up with what best for the library. Wdyt about this?

Also, even if we want to have some more comprehensive solution, I would vote to have a fixed version faster, and then to "optimize" the fix, since it is a pretty tricky bug which is hard to detect, and some lib users may be impacted without even knowing about that.
I may suggest the next steps on this issue:

  1. Merge this fix & release fixed version
  2. Optimize the fix & align with diagnostics & metrics as a subsequent effort (PR)

Wdyt?

Also, the proposed fix in #1298 does bring another bug, 'cause it ignores the sequence of rebalance events calls, e.g. revoke first, or assign first in case of the same partitions. Regarding the assign lost sequence, I thought it should be impossible in practice, so hadn't added handling for it, but not sure. Why the partition should be lost right after it just assigned, it should be revoked first. In any case, if you think it is better to handle this scenario as well, we can do so.
cc @svroonland @guizmaii

@erikvanoosten erikvanoosten reopened this Aug 15, 2024
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Aug 15, 2024

is this a theoretical issue, or did you see this happening in practice?

It is practical, as I mentioned in my previous comment as the bug really brings troubles for a project I'm currently working on.

Thanks, good to know! Kafka never stops surprising me.

it goes against the (never documented) idea that the rebalance listener should be as simple and short as possible and not take any decisions unless absolutely necessary.

Could you, please, give the guidance, where I can get this info, cause I never heard of it?

This is not a general idea and it is also not documented anywhere. This idea is purely based on experience with this library. In the past there was much more logic in the rebalance listener and because of that it became very hard to make certain changes. This was resolved by moving the logic to the main loop, and making the rebalance listener only register what happened and act on it as little as possible.

Also, I would really like to fix this issue, and I could introduce the best solution we can come up with, wdyt? Not sure why to instantly close this PR, and opening another one, instead of figuring out the best solution and letting me introduce it, as the one who found the bug and proposed fix, I would love that. It is what I saw as a nice collaboration on an open source project, especially from maintainers.

My apologies! I am very much used to most people that are happy to hand a problem off to the maintainers. I should have been more sensitive and realize that this does not apply to everybody. In addition, IMHO maintainers should be open to help other people get into the project and I failed on that aspect.

Also, the proposed fix in #1298 does bring another bug, 'cause it ignores the sequence of rebalance events calls, e.g. revoke first, or assign first in case of the same partitions.

Yes, that is an excellent observation 🙏 ! I have closed #1298 because of it.

To fix this according to the idea that the rebalance listener should do as little as possible, it should maintain a list of changes instead of merging everything into a few sets. However, that will be a bigger change with the only immediate benefit being that metrics can be correct (see next paragraph).

I am also still going back and forward on whether we should let the library user know that a partition was assigned and immediately revoked. When I was writing #1298 I realized that most of the time you really don't care and it is a burden that you need to take this into account.

In short, I have gone full circle and think we should follow the approach of this PR now. In addition, we can keep in mind a future change in which we make the rebalance listener smarter by making it keep a list of changes instead of the current approach.

@ytalashko could you please make a change such that assign+lost is handled the same as assign+revoke?

If you want, could you also take a look at adding a unit test in RunloopSpec? (Note, we only started writing unit tests for Runloop recently, so there aren't many unit tests yet. We do have many integration tests.)

@ytalashko
Copy link
Contributor Author

Hey, @erikvanoosten,
Thanks a lot for taking my considerations into account and for your reply!

This is not a general idea and it is also not documented anywhere. This idea is purely based on experience with this library. In the past there was much more logic in the rebalance listener and because of that it became very hard to make certain changes. This was resolved by moving the logic to the main loop, and making the rebalance listener only register what happened and act on it as little as possible.

Got it, thanks for sharing this info.

To fix this according to the idea that the rebalance listener should do as little as possible, it should maintain a list of changes instead of merging everything into a few sets. However, that will be a bigger change with the only immediate benefit being that metrics can be correct (see next paragraph).
... In addition, we can keep in mind a future change in which we make the rebalance listener smarter by making it keep a list of changes instead of the current approach.

Yeah, that's sounds great

@ytalashko could you please make a change such that assign+lost is handled the same as assign+revoke?

Sure, will do

If you want, could you also take a look at adding a unit test in RunloopSpec? (Note, we only started writing unit tests for Runloop recently, so there aren't many unit tests yet. We do have many integration tests.)

Sure, 👍

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just a few small comments.

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're good to merge. @ytalashko do you agree?

@ytalashko
Copy link
Contributor Author

I think we're good to merge. @ytalashko do you agree?

Yeah, sounds good, to me, thanks @erikvanoosten!

@erikvanoosten erikvanoosten merged commit f2de848 into zio:master Aug 15, 2024
14 checks passed
@ytalashko ytalashko deleted the fix-fast-assign-revoke-during-rebalance branch August 15, 2024 19:14
@svroonland
Copy link
Collaborator

A bit late, but I was wondering: although a partition stream is started, does it actually get records?

@erikvanoosten
Copy link
Collaborator

I checked that. If we receive records for a partition that does not have a running stream, those records are silently ignored. Not great, but in thus case helpful.

@svroonland
Copy link
Collaborator

But this issue is about the inverse situation: there is a running stream but it doesn't get any records. Right?

@ytalashko
Copy link
Contributor Author

ytalashko commented Aug 18, 2024

A bit late, but I was wondering: although a partition stream is started, does it actually get records?

Yeah, both streams for the same partition received same records, creating many duplicates.
To note, I haven't investigated the running behaviour of those duplicated streams too deeply, e.g. for how long they were running (but by the amount of duplicates, I can assume for quite long, maybe for as long as instance lived), or if both streams received "full copy" of the partition, or maybe some stream received only part of messages.

@erikvanoosten
Copy link
Collaborator

You need to enable commitSafeRebalance (not the exact name) to prevent duplicates during a rebalance.

@ytalashko
Copy link
Contributor Author

ytalashko commented Aug 18, 2024

You need to enable commitSafeRebalance (not the exact name) to prevent duplicates during a rebalance.

Yeah, this is a good point, thanks. The services (consumers) are using rebalance safe commits. The duplicated messages were coming only from specific partitions, from those with the duplicated partition streams, and far past rebalance events occurrence.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Aug 18, 2024

I was going to write that this is not possible, because the broker decides what to sent to the client. But that is not entirely true, perhaps, because we have a stream, we also resume the partition, and this causes the broker to send records, even though the partition is not assigned. I am just guessing, we'd need to experiment to find out if this is indeed the case.

@ytalashko
Copy link
Contributor Author

ytalashko commented Aug 18, 2024

I was going to write that this is not possible, because the broker decides what to sent to the client. But that is not entirely true, perhaps, because we have a stream, we also resume the partition, and this causes the broker to send records, even though the partition is not assigned. I am just guessing, we'd need to experiment to find out if this is indeed the case.

Maybe you are right about this is not possible. It is interesting through if that's the case, or not.
Back to my case:
Let's say we have two consumers in the group and they both have streams running for the same partition. Im not sure those two streams will really get the duplicated messages. For my project, duplicated streams were running within different consumers in the group and also within the same consumer.
I haven't exactly researched if duplicated messages processing happens only in case of duplicated streams under the same consumer or different consumers, or both cases. Only confirmed that duplicated messages processing happens in case of duplicated streams (at the time of investigation, was more focused on finding the root cause of the issue).
Maybe it happens only in case of duplicated streams within the same consumer in the group, but Im not sure.

@erikvanoosten
Copy link
Collaborator

Back to my case:

I was describing your case! :)
Because of the bug (which is now fixed due to this PR) we could have a stream (even thought the partition was no longer assigned), leading to a partition resume, presumably leading to duplicated records.

I haven't exactly researched if duplicated messages processing happens only in case of duplicated streams under the same consumer or different consumers, or both cases. Only confirmed that duplicated messages processing happens in case of duplicated streams (at the time of investigation, was more focused on finding the root cause of the issue).

Except for bugs, it is not possible that you would get duplicated records in a single consumer. That is not how the java client works. So I'll put my money on duplication across different consumers.

@erikvanoosten
Copy link
Collaborator

Arrchhh. Scratch the last few messages. I just looked in the code (https://github.com/zio/zio-kafka/blob/master/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala#L437). Zio-kafka only resumes partitions that are actually assigned. So the bug solved in this PR is very minor, as far as I can see, it can not have lead to duplicate records.

@ytalashko
Copy link
Contributor Author

ytalashko commented Aug 19, 2024

Except for bugs, it is not possible that you would get duplicated records in a single consumer. That is not how the java client works.

It is not java-clients lib which created duplication of messages, it is zio-kafka lib.

Maybe I've not explained it well.
Let's consider same consumer with duplicated streams, it could even be observed by looking into code:
Basically, with the bug fixed in this PR, we could have end up getting two streams for the same partition running in the same consumer (same instance of some service).
When zio-kafka fetched messages from broker, it then sends those messages to the streams https://github.com/zio/zio-kafka/blob/master/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala#L390-L406. In case there is two streams for the same partition, both those streams will get same messages (this where duplication is created).
And the logic distributing messages to streams works fine, it's just the circumstances (state) under which it is triggered are "broken".

@svroonland
Copy link
Collaborator

Ouch, that's right. That bit of code is fine under the assumption that no duplicate TopicPartitions are in the streams Chunk, but that's not guaranteed here.

In this line the duplicate partition would be selected for being started.

@svroonland
Copy link
Collaborator

In that case I'd say that this is not a minor or rare issue, but quite a proper bug..

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Aug 19, 2024

Ouch! Yes, you are correct! That is a major bug indeed! I never considered that a partition could be assigned, revoked and assigned again.

@ytalashko
Copy link
Contributor Author

In this line the duplicate partition would be selected for being started.

Yeah, exactly, and, to note, this line is also fine, it is not it's responsibility to judge the input its given, at least from my point of view.

@erikvanoosten
Copy link
Collaborator

@ytalashko I have updated the description of the PR to describe the situation to the latest insights. Can you check it please?

@ytalashko
Copy link
Contributor Author

@ytalashko I have updated the description of the PR to describe the situation to the latest insights. Can you check it please?

Thanks, just checked, looks good, 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants