Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ConsumerGroups #1083

Closed
wants to merge 7 commits into from
Closed

Introduce ConsumerGroups #1083

wants to merge 7 commits into from

Conversation

dim
Copy link
Contributor

@dim dim commented Apr 15, 2018

Hi,

I've been trying to extract some of the lessons from https://github.com/bsm/sarama-cluster into a general purpose, low-level consumer group and would like to get some feedback on the approach.

An implementation of sarama-cluster with the proposed API would look something like this:

func main() {
	config := NewConfig()
	config.Version = V0_10_1_0

	// Init client
	client, err := NewClient(kafkaBrokers, config)
	if err != nil {
		log.Fatalln(err)
	}
	defer client.Close()

	// Init consumer manager
	consumerManager, err := NewConsumerFromClient(client)
	if err != nil {
		log.Fatalln(err)
	}
	defer consumerManager.Close()

	// Init offset manager
	offsetManager, err := NewOffsetManagerFromClient("my_group", client)
	if err != nil {
		log.Fatalln(err)
	}
	defer offsetManager.Close()

	// Init consumer group
	group := NewConsumerGroupFromClient("my_group", client)
	defer group.Close()

	// Start rebalance/consume loop
	wg := new(sync.WaitGroup)
	for {
		sess, err := group.Subscribe([]string{"topic-a", "topic-b"})
		if err != nil {
			log.Fatalln(err)
		}

		// Consume each partition in a separate goroutine
		for topic, partitions := range sess.Claims() {
			for _, partition := range partitions {
				offsets, err := offsetManager.ManagePartition(topic, partition)
				if err != nil {
					log.Fatalln(err)
				}

				offset, _ := offsets.NextOffset()
				consumer, err := consumerManager.ConsumePartition(topic, partition, offset)
				if err != nil {
					log.Fatalln(err)
				}

				wg.Add(1)
				go func() {
					defer wg.Done()
					consume(sess, consumer, offsets)
				}()
			}
		}
		wg.Wait()
	}
}

func consume(sess ConsumerGroupSession, consumer PartitionConsumer, offsets PartitionOffsetManager) {
	defer sess.Release()
	defer offsets.Close()
	defer consumer.Close()

	for {
		select {
		case msg := <-consumer.Messages():
			log.Printf("%#v", msg)
			offsets.MarkOffset(msg.Offset, "")
		case <-sess.Done():
			return
		}
	}
}

What do you think?

@eapache
Copy link
Contributor

eapache commented Apr 16, 2018

Perhaps I'm missing a use case, but why distinguish ConsumerGroup from ConsumerGroupSession when you can only ever have one session anyway? I would think you could merge those and have NewConsumerGroup take the list of topics.

@eapache
Copy link
Contributor

eapache commented Apr 16, 2018

Also feel free to split the JoinGroup api updates into a separate PR, they're useful regardless of what happens here.

@dim
Copy link
Contributor Author

dim commented Apr 16, 2018

The consumer group joins a cluster and acquires a semi-permanent member ID on its first session. A session on the other hand lives until the next rebalance cycle. Once Done() triggers, the user must stop consuming the claimed partitions, commit offsets, Close() the session and restart it to rejoin the cluster under the same member ID.

One way or another, the consumer group life cycle requires a big for loop, either through a 'context-like' session or via three channels directly on the ConsumerGroup interface, i.e. Claims(), Rebalance() and Errors().

Thoughts?

@dim
Copy link
Contributor Author

dim commented Apr 16, 2018

So actually, I can see no better way. The consumer group is effectively an iterator over sessions. Obviously, I could move the topics argument to NewConsumerGroup, but I have been frequently asked for a new feature on sarama-cluster to allow users to change the topic subscriptions without restarting the (cluster) consumer, that's why I have decided to put it on the session initialiser.

@eapache
Copy link
Contributor

eapache commented Apr 16, 2018

allow users to change the topic subscriptions without restarting the (cluster) consumer

Are there actually benefits to doing this in terms of connection/metadata setup or whatever?

reads...

Oh, the memberID is persisted in the Group itself. I didn't realize that the Group carried any state between sessions. That makes sense.

Wait, is it even possible to change the topic subscriptions broker-side without introducing a whole new group ID? How can the broker coordinate if the different group members subscribe to different topics entirely?

Close() the session

In your sample this is called Release I think? And if I'm reading properly, it's wrong because it will get called for every partition (including the first), instead of only when all partition consumers are done?

The consumer group is effectively an iterator over sessions.

So I suppose the issue with something like func (*ConsumerGroup) Sessions() <-*Session is that you can't subscribe to the next session until the previous one is closed, which must be manually called anyway once all the various partition-consumers are shut down and offsets committed.


I wonder if it makes sense to have ConsumerGroupSession.Release(topic, partition) and track final state internally instead of making the caller use a WaitGroup?

And then ConsumerGroup.Subscribe could return a <-Session you can actually range over. If you really want to support switching up the set of topics you could add a ConsumerGroup.Unsubscribe() which closes that channel?

I'm not sure if these ideas actually end up making sense in practical usage, but I think they would make your sample sarama-cluster implementation a little simpler and more robust.

edit: I guess error-handling becomes annoying then doesn't it

}

func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
func (s *consumerGroupSession) MemberID() string { return s.parent.memberID }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this just be exposed on the ConsumerGroup itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really helpful as these are only available for the duration of the session. A session can be terminated by the server (failing heartbeat) or by the client (heartbeat is stopped)

@dim
Copy link
Contributor Author

dim commented Apr 16, 2018

One idea is to make Subscribe this the previous session, if one exists. It would make things more fluid but also less obvious. I thought about exposing the iterator via a Sessions() channel before, but yes, error handling would be quite annoying.

@dim
Copy link
Contributor Author

dim commented Apr 16, 2018

... make Subscribe close/release...

},
},
{
members: map[string][]string{"M1": {"T1"}, "M2": {"T1", "T2"}},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can the broker coordinate if the different group members subscribe to different topics entirely?

Here's an example of that. Topics can be distributed unevenly even under the same group ID.

}
wg.Wait()
}()
if err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove this

@dim
Copy link
Contributor Author

dim commented Apr 27, 2018

@eapache I have created a high-level consumer (which would be sarama-cluster v3) on top of this patch - bsm/sarama-cluster#234. What do you think?

@jiangxl2018
Copy link

jiangxl2018 commented May 12, 2018

NewConsumerGroupFromClient
hi dim,whereit

@dim
Copy link
Contributor Author

dim commented Aug 13, 2018

Closing in favour of #1099

@dim dim closed this Aug 13, 2018
@dim dim deleted the feature/consumer-groups branch September 12, 2018 15:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants