Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a higher-level consumer group #1099

Merged
merged 10 commits into from
Sep 27, 2018
Merged

Implement a higher-level consumer group #1099

merged 10 commits into from
Sep 27, 2018

Conversation

dim
Copy link
Contributor

@dim dim commented May 4, 2018

Related to my other PR #1083, this is another attempt, this time with a higher-level consumer API. I have been going back and forth on this one, I can't find a better way to make it flexible enough to support all the use cases and comprehensible at the same time. I would really appreciate some feedback.

@dim
Copy link
Contributor Author

dim commented May 10, 2018

I thought it might make some sense to outline the requirements for this.

Key Objectives

  1. Distributed consumption of topics/partitions through a (reasonably) trivial ConsumerGroup API.
  2. Ability to mark message offsets as part of the consumer process.
  3. Ability to change topic subscriptions at runtime (and trigger client-side rebalance cycle).
  4. Consume individual partitions in separate goroutines (no mandatory multiplexing).
  5. Support custom partitions assignment (balancing) strategies.
  6. Ability to run custom tasks after a rebalance, once partitions have been assigned, but before consuming has started.
  7. Ability to run custom tasks after rebalance has been triggered, but before subscriptions are released.

@dim
Copy link
Contributor Author

dim commented May 18, 2018

@eapache sorry to 'rush', but it would be good to get some feedback on this. I have created a tiny wrapper for this in https://github.com/bsm/sarama-cluster/tree/feature/v3-alt to be able to test various aspects but I am not sure I can find a better API that is able to fulfull all the objectives above.

@eapache
Copy link
Contributor

eapache commented May 18, 2018

Sorry, I've been really busy recently with other things (Sarama is no longer related to my day-to-day at Shopify so it's been a bit of a trick to squeeze in time for it). I'll try and get to it soon, but even if it takes me another week or so, I haven't forgotten about it.

@varun06
Copy link
Contributor

varun06 commented Jun 4, 2018

is it moving forward?

@dim
Copy link
Contributor Author

dim commented Jun 11, 2018

@eapache sorry to be a pain, just reminding

@eapache
Copy link
Contributor

eapache commented Jun 14, 2018

Hmm, it is a complicated problem. I do like the general shape. Some thoughts:

  • I'm tempted to get rid of the Consume method and related interfaces? It's trivial to spin up a goroutine per claim if that's what you want, and the ConsumerGroupHandler interface is both confusing and dangerous due to the concurrency thing.
  • I find it a bit weird that Done is required as public on the session. Can't the claims just stop feeding messages at that point?
  • I think I understand the difference between Cancel and Close but it would be nice if one of them had a better name to make it less ambiguous.

Generally, I do think is a pretty good approach. Given the complexity of the problem I don't think I could do any better at least.

@eapache
Copy link
Contributor

eapache commented Jun 14, 2018

The handling of groups, sessions, and claims does feel more natural in this approach than in #1083 though.

@dim
Copy link
Contributor Author

dim commented Jun 15, 2018

@eapache thanks, I will try to address your feedback towards the end of next week. We have also found a little bug/oddity when members are dropped server-side which needs some additional error handling code.

Finally, I am not 100% sure how to test all this (properly). The current vagrant approach is not sufficient for functional testing cluster functionality. In my latest sarama-cluster branch (which is based on this PR), I have created a docker-compose cluster with 3 brokers and 3 ZKs to make it work. Any thoughts?

@eapache
Copy link
Contributor

eapache commented Jun 15, 2018

In what way is the vagrant setup not sufficient? It spins up five brokers with five zookeepers, all connected via toxiproxy for faking network isolations.

@dim
Copy link
Contributor Author

dim commented Jun 25, 2018

@eapache I have pushed better functional tests, here's a few comments on your feedback:

I'm tempted to get rid of the Consume method and related interfaces? It's trivial to spin up a goroutine per claim if that's what you want, and the ConsumerGroupHandler interface is both confusing and dangerous due to the concurrency thing.

Yes, I agree, and I was very hesitant about this initially, but there is a substantial caveat. Once the first consume goroutine exists, it must cancel the session and wait for all other goroutines to exit too, otherwise the user is in danger of consuming messages twice or locking the session beyond the server timeout. I also couldn't find any use case where the usage pattern would differ from my implementation. To consume partitions, the user needs to spin up a goroutine for each and perform the exact same steps as I did in my code https://github.com/Shopify/sarama/pull/1099/files#diff-9076b00c0e67e4484d0c04fb247353c7R507.

I find it a bit weird that Done is required as public on the session. Can't the claims just stop feeding messages at that point?

I agree and I didn't have it exported on the original draft. Instead of listening for Done(), the consumers should just wait for the Messages() channel to be closed (which is what I suggest in func ExampleConsumerGroup() too). There was a reason why I decided to export it, but I cannot remember it, will remove it again.

I think I understand the difference between Cancel and Close but it would be nice if one of them had a better name to make it less ambiguous.

I agree, but again, I couldn't think of anything better. I used context terminology, i.e. Cancel and Done but happy rename if you have a better idea

@eapache
Copy link
Contributor

eapache commented Jun 26, 2018

Yes, I agree, and I was very hesitant about this initially, but there is a substantial caveat.

Makes sense. In that case I'm tempted to remove the Claims method instead. There's no point in tempting people if they should really use the prebuilt solution to a difficult problem. It's trivial to add back if we find a use case.

I agree, but again, I couldn't think of anything better. I used context terminology, i.e. Cancel and Done but happy rename if you have a better idea

I'm wondering if we even need Close as a public method on the session? Once the session has been cancelled and the last claim has been closed we can Close the session automatically?

Minor additional nit, I'm not a big fan of having both ConsumerGroupHandler and ConsumerGroupHandlerFunc defined. I imagine most real implementations will have state and so we should be encouraging an actual struct to conform to the interface. If somebody wants to use an unbound function they can always define the func wrapper themselves.

@dim
Copy link
Contributor Author

dim commented Jun 27, 2018

@eapache

I'm wondering if we even need Close as a public method on the session? Once the session has been cancelled and all the claims have been closed we can call Close automatically?

We definitely need it to support objective #7. When the session is cancelled (either by the user or by a server rebalance) the individual Consume goroutines will exit but the claims will still be held by the consumer group member until either Close() is called or session.timeout.ms is reached. This is the only opportunity window for the user to perform essential tasks. A typical use case:

  1. each consumer group member opens a file for writing at the beginning of a new session
  2. the Consume() loop transforms incoming messages and appends them to that file
  3. once rebalance is triggered, wait for all Consume() loops to exit, then flush and close the file
  4. if successful, call MarkOffset() with the latest offset for each of the claimed partitions
  5. explicitly Close() the session to release the claims

Not sure how to explain it better in the documentation. Happy to rename the Close() method to Release() instead, if that helps.

Minor additional nit, ...

I am happy to remove that interface and simply accept a func(ConsumerGroupClaim) error instead. Does that sound better?

@eapache
Copy link
Contributor

eapache commented Jun 27, 2018

This is the only opportunity window for the user to perform essential tasks.

Ah, right. Following the same logic around Consume, what if we instead expanded ConsumerGroupHandler to have a second method Cleanup() which got called automatically at the relevant point? It's a tiny bit of extra overhead for users which don't need it, but it prevents any mistakes from calling Close at the wrong time or getting otherwise confused by the lifecycle.

@eapache
Copy link
Contributor

eapache commented Jun 27, 2018

And something else this made me think of; if we include a Setup() or something as well, we could drop the Consume method entirely and just require an instance of this interface to be passed to the original Subscribe method.

@dim
Copy link
Contributor Author

dim commented Jun 28, 2018

@eapache how about ^^? It is certainly safer to use but I am still not sure it's intuitive enough. What do you think?

@eapache
Copy link
Contributor

eapache commented Jun 29, 2018

I like it. My only question is how the outer loop would work now? Does Consume need to return some sort of channel to wait on?

@eapache
Copy link
Contributor

eapache commented Jun 29, 2018

Or maybe instead of taking a ConsumerGroupHandler it should take a method to construct a ConsumerGroupHandler and loop resubscribing automatically until Close is called?

edit: Or even just loop automatically on the same handler object. That would be convenient I think?

@dim
Copy link
Contributor Author

dim commented Jun 29, 2018

Hmm, not sure about the loop, the goal was to allow users to change their subscriptions and I don't think that a handler factory is particularly intuitive. Currently, Consume just blocks until the next rebalance (or until Close is called).

After some more fuzzy testing, we found one more issue, related to the offset manager. Currently, a partition offset manager will remain in a dirty state until it has successfully committed the stashed offsets. Unfortunately, any offset commit attempts after a rebalance are rejected by Kafka with a ErrUnknownMemberId, which means that pom.AsyncClose will remain stuck forever. There is no simple solutions to it, we don't really want to discard stashed offsets as the member will likely receive the same claims after a rebalance and could commit them in the next cycle. At the same time, we cannot re-use the same offset manager across sessions and just increase the generationID. I will have to think about this.

@eapache
Copy link
Contributor

eapache commented Jun 29, 2018

Currently, Consume just blocks until the next rebalance (or until Close is called).

Oh, that also works for me.

Re. the offset management issue, the issue is that if the POM doesn't manage to flush before the rebalance completes on the broker then it gets stuck? One mitigation (not a full solution) would be to add a little code to PartitionOffsetManager.AsyncClose() to force an immediate flush, rather than the current default which just waits for the next timer tick.

But fundamentally I don't think we can make any guarantees here. If some other client kicks off the rebalance and/or the broker rejects our offset storage request for some reason, it's always fundamentally possible for the rebalance to "leave us behind" like this. I think the best we can do is discard the stashed offsets entirely; the worst that happens is that the same offset gets consumed twice which is already part of the standard Kafka behaviour (excepting the new transactional stuff which I don't fully understand yet).

@dim
Copy link
Contributor Author

dim commented Jun 29, 2018 via email

@eapache
Copy link
Contributor

eapache commented Jun 29, 2018

I'm just not sure how that would solve the problem? If Kafka is going to reject the requests, there's nothing the client can do whether the request is made from the offset manager or the consumer group?

@dim
Copy link
Contributor Author

dim commented Jul 2, 2018

Re-balances do happen regularly and most of the time members inherit the same claims they just had in the previous session. In this case a member could just resume from its stashed offsets and commit at the next possible opportunity. Currently, we are creating a new offset manager with each session, effectively wiping all previous state. Ideally, the offset management should happen at the ConsumerGroup and not at the ConsumerGroupSession level and I don't quite see how this can be accomplished with the existing OffsetManager.

@eapache
Copy link
Contributor

eapache commented Jul 3, 2018

The fact that you had to add memberID and generation to the offset manager in the first place is a hint that it wasn't designed with rebalancing like this in mind.

At the same time, we cannot re-use the same offset manager across sessions and just increase the generationID

Why can't we re-use the same offset manager and just update the member / generation IDs when we get the new session? You'll have to add a state to the POM I think, so it can get "paused" while a rebalance is in progress, but I think that fits pretty nicely in the existing design; have the broker abandon it as if the Coordinator changed, but then just hold off on selecting the new broker until we get a kick from the consumer?

@dim
Copy link
Contributor Author

dim commented Jul 4, 2018

@eapache hmm, I can give it a go, but I remember running into a dead end the last time I tried. One thing I would like to clarify beforehand is: why does the OffsetManager need multiple BOMs? A consumer group can only have one active coordinator, so why not remove the brokerOffsetManager altogether and move its mainLoop and the flush to the offsetManager level? This would greatly simplify the rebalance coordination, but I'm sure there is a reason why you have designed in such way.

@dim
Copy link
Contributor Author

dim commented Jul 4, 2018

I could provide a separate PR for the changes ^^ BTW

@eapache
Copy link
Contributor

eapache commented Jul 4, 2018

why does the OffsetManager need multiple BOMs? A consumer group can only have one active coordinator, so why not remove the brokerOffsetManager altogether and move its mainLoop and the flush to the offsetManager level?

Honestly, I just copied the broker/partition rebalance pattern from the consumer code without thinking about it too hard. I wanted to support offset tracking for multiple topic/partitions that didn't necessarily belong to the same consumer group, but even then the cluster only ever has one coordinator right?

If there's no functional reason to support multiple BOMs then I'm happy to get rid of it.

@dim
Copy link
Contributor Author

dim commented Jul 4, 2018

👍 I'll submit a PR for this

@AlmogBaku
Copy link

AlmogBaku commented Nov 26, 2018

The motivation is to listen to a list of topics, and dynamically to listen for more.

I.E, collecting information from every topic with a specific pattern (logs_orders,logs_payments, etc)

@varun06
Copy link
Contributor

varun06 commented Nov 26, 2018

Yeah, that make sense, I haven't seen a mechanism to add topics at run time, but I could be wrong

@dim
Copy link
Contributor Author

dim commented Nov 26, 2018

@varun06 @AlmogBaku no there isn't but it is a quite specific requirement and you can periodically check for new topics in the background yourself. If something's found, you could simply exit the current session and call Consume again with the updated list of topics to listen to. Subscribing to a new topic requires a rebalance anyway and therefore a new session.

@varun06
Copy link
Contributor

varun06 commented Nov 26, 2018

Thanks @dim. While we are at it, a quick question, is there a stat to track how long it takes a consumeClaim function to execute?

@dim
Copy link
Contributor Author

dim commented Nov 26, 2018

@varun06 this very much depends on how quickly other group consumers in the cluster are able to stop their sessions in order to rebalance. There are also various other timeout settings that Kafka uses as part of its (far from trivial) rebalance algorithm. I've mostly used Kafka's own documentation and added additional notes:

@AlmogBaku
Copy link

@dim the reason I asked is, IIRC, because sarama-cluster used to support this feature... what was the motivation then?

@varun06
Copy link
Contributor

varun06 commented Nov 28, 2018

@dim is there an easy way to get claim batch size(len(claim.Messages()) and time taken to process one claim. I am not able to understand from the code around claim. Thanks in advance.

I tried to do that in consumeClaim(), but that consume loop is long lived and I don't get anything until rebalance happens or I exit.

@dim
Copy link
Contributor Author

dim commented Dec 3, 2018

@varun06 I don't quite understand your question. A claim is long lived and will only exit when (one of) the handler(s) exits or when the server signals a rebalance. I have tried to explain it all here: https://godoc.org/github.com/Shopify/sarama#ConsumerGroup. You can also see an example below.

@varun06
Copy link
Contributor

varun06 commented Dec 3, 2018

Thanks @dim and sorry for poor wording. I understand the long lived part of it and my batch question was stupid. So time to process a claim is also dictated by rebalance or exit, right?

@dim
Copy link
Contributor Author

dim commented Dec 3, 2018

Once you have a claim, it will live until the next rebalance. The rebalance can be server-side (e.g. when another consumer decides to enter or exit the consumer group) it it can be client-side (i.e. when you call return inside a ConsumerGroupHandler)

@vfiset
Copy link

vfiset commented Feb 27, 2019

@dim Any plans on providing an example on how to use this new api ?

@varun06
Copy link
Contributor

varun06 commented Feb 27, 2019

@vfiset there is an example in godoc -> https://godoc.org/github.com/Shopify/sarama#ConsumerGroup

@vfiset
Copy link

vfiset commented Feb 28, 2019

Thanks, totally missed it. Not as cool as the examples for the producer on github for a novice like me but good enough 👍

@jeroenrinzema
Copy link
Contributor

jeroenrinzema commented Feb 28, 2019

@varun06 is it maybe an idea to expand the examples to give new developers interested in Sarama a quick glance on how it works? It could also be used as a small "cheat sheet". I could create a PR to expand the examples.

@varun06
Copy link
Contributor

varun06 commented Feb 28, 2019

That would be great @jeroenrinzema

@jeroenrinzema jeroenrinzema mentioned this pull request Mar 8, 2019
@jeroenrinzema
Copy link
Contributor

I am trying to investigate a case where partitions are added to a Kafka topic during run time. Are the newly created partitions automatically added to the consumer group after creation? I have been trying to look through the code but did not find hints to it.

@dim
Copy link
Contributor Author

dim commented Mar 19, 2019

@jeroenrinzema topics can be added/removed with every new loop iteration of group.Consume. You can monitor your client.Topics in a separate goroutine, exit the current consumer session when new topics appear and adjust the subscriptions for the next consumer session.

@Ocaenyth
Copy link

@dim How can we recreate sarama-cluster's Whitelist feature using the new sarama.ConsumerGroup (which is automatically subscribing to any new topics that fulfill a regexp match) ?

@dim
Copy link
Contributor Author

dim commented Apr 17, 2019

@Ocaenyth Why? I don't want to overcomplicate the library. You can implement whitelist subscriptions yourself: before starting a session, just get a list of known topics from the client and select the ones that match your regexp before calling group.Consume.

@Ocaenyth
Copy link

I figured since you had implemented it in sarama-cluster to "do everything on its own" you might have implemented it in this PR as well, so I thought I was missing something. (And I thought that's what you were mentioning in this issue)

But thanks for the quick response though, now I know I'll have to take a deeper look on how this works !

@levh
Copy link

levh commented Mar 30, 2020

@dim
hi
I would like to have the ability to define after a rebalance, define the we will start to consume from specific offset (for example highwatermarkOffset) and not from the last offset that wasn't commited
for example: using OffsetNewest

  1. let assume that we are running a consumer, and produce some messages where consumed

  2. now lets stop the consuming. while kafka and the producer can keep send messages to that topic. which means that in the topic there might be more messages that were sent but were not consumed yet

  3. after some second lets turn on the consumer again
    and now, i don't want to consume the old messages but start to consume only messages that arrive after the consumer was up again. (which I guess is highwatermarkOffset)

I saw this link but the readme file wasn't working and it is a bit hard to test.
https://github.com/mistsys/sarama-consumer

I wonder if adding such option is possible to "skip" unmarked offset until fresh point (after consumer is up)

@dim
Copy link
Contributor Author

dim commented Mar 31, 2020

@levh there is no such option, but you could - between step 2 and 3 - simply delete the consumer group offset information and resume from OffsetLatest.

@levh
Copy link

levh commented Mar 31, 2020

@dim
thanks,
I'm not sure that I understand your solution.

  • what do you mean between step2 to 3? at Setup(session) I have no access to the initial offset. (can't see how to update it by ConsumerGroupHandler interface.
    by your suggestion,
  • How can I delete the consumerGroupOffset?
    +And in that case, I think that taking OffsetNewest will cause to consume the uncommitted messages that produced before step3.
  • or if you meant something else "OffsetLatest" than OffsetNewest I couldn't find it in the repository.
    I see that there is a private function chooseStartingOffset which calculate the starting offset by
    the offset calculated at private (s *consumerGroupSession) consume function.

I have an implementation suggestion for supporting that. can you please consider merging it?

@levh
Copy link

levh commented Apr 1, 2020

@dim
by reading sarama code the initialOffset is affected by the inner function func (s *consumerGroupSession) consume(topic string, partition int32) at consumer_group.go file
while setup is called at function func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { and it seems that there is no topic and partition known at the Setup function to update.

if i'm looking at "ConsumeClaim" function
ConsumeClaim is called by private function func (s *consumerGroupSession) consume(topic string, partition int32) after we update kafka about starting offset
I'm working today at confluent-kafka-go and because of the performance I checked use Sarama instead. (the asked feature exists at Confluent kafka go) so I wonder if it is supported also at Sarama
I even created pull request for suggest supporting the feature

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.