Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initial offset management implementation. #379

Closed
wants to merge 1 commit into from

Conversation

wvanbergen
Copy link
Contributor

Attempt at fixing #2. This adds the following methods to Client:

  • CommitOffset(group, topic, partition, offset, metadata)
  • FetchOffset(group, topic, partition)

Notes

  • I still need to think about where to retry what.
  • We probably want to cache the offset coordinators in the client struct so we don't always do 2 requests.
  • I cannot get the metadata field to work. Not sure why, but it just always returns an empty string.
  • I would like to be able to submit multiple offsets in a single request. Not sure whether we should have an API that supports that, or implement batching internally.
  • This won't work for Kafka 0.8.1 obviously. We probably want to skip the tests based on what Kafka version we are running against.
  • You can also specify a timestamp when committing, but this API doesn't expose it.

@Shopify/kafka

@wvanbergen
Copy link
Contributor Author

The name of the topic is __consumer_offsets, and by default creates 50 partitions with replication factor 3.

@wvanbergen
Copy link
Contributor Author

  • Coordinators are now cached
  • The functional test only runs on Kafka 0.8.2 and up.
  • Retry logic is in place, but needs a careful review.

@wvanbergen
Copy link
Contributor Author

I came to realize that we should use Config.ClientID instead of specifying a consumerGroup explicitly. That would mean you cannot use a single Client to monitor processing lag of a bunch of consumergroups though. Not sure if that is an issue?

@@ -320,11 +325,171 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return block.Offsets[0], nil
}

func (client *client) CommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error {
return client.tryCommitOffset(consumerGroup, topic, partitionID, offset, metadata, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in hindsight I kind of dislike this pattern, I think I'd prefer a loop. YMMV

@eapache
Copy link
Contributor

eapache commented Mar 23, 2015

I came to realize that we should use Config.ClientID instead of specifying a consumerGroup explicitly.

Why? They are entirely different fields in the request...

return client.tryCommitOffset(consumerGroup, topic, partitionID, offset, metadata, 5)
}

func (client *client) tryCommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string, attemptsRemaining int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re. group vs id etc, I think Consumer.Group should be a config, and these should be methods on PartitionConsumer that use that config (and the topic/partition etc.)

@eapache
Copy link
Contributor

eapache commented Mar 23, 2015

👍 for splitting up the functional tests

@wvanbergen
Copy link
Contributor Author

I split up the functional tests in a separate PR (#388)

@wvanbergen
Copy link
Contributor Author

Updated:

  • Rebased on the latest master
  • Renamed getCoordinator() toCoordinator() and refreshCoordinator()
  • Use Version 1 of the OffsetFetch and OffsetCommit requests, so metadata now works.

I am happy with the Coordinator() method now. I may extract it out of this PR into a separate one.

  • Careful look at the usage of locks would be appreciated. Does making two different locks make sense?
  • I had to hardcode the retry Max and Backoff values, because initially Kafka has to create the _offsets topic which takes longer than our default settings allow for. Once this is done, the coordinator call usually is much faster,
  • I will add some unit tests for just this method to test the retry behavior.

@wvanbergen
Copy link
Contributor Author

I am still struggling with finding a nice API for the Fetch and Commit calls. I don't really want to add them to PartitionConsumer initially, because this requires you to have the PartitionConsumer instance to be in scope, which can easily not be the case (see #339). Also, it wouldn't allow for batching.

So I'd like to have a generic method on Client that allows you to commit/fetch multiple offsets at once, that handles discovering the coordinator, and dealing with retries. We can then build higher level APIs on top of that.

@eapache
Copy link
Contributor

eapache commented Mar 31, 2015

this requires you to have the PartitionConsumer instance to be in scope, which can easily not be the case

I'm not sure I agree - in 99% of cases the consuming code will be

for msg := range partitionConsumer.Messages() {
    // do whatever
}

and will trivially have both in scope. I'm wondering if we should perhaps rethink the consumer group API instead. We can discuss this offline at some point - a good first step is to split the Coordinator code into a separate PR since that's uncontroversial and useful on its own.

@wvanbergen
Copy link
Contributor Author

As mentioned in #339, a common use case will be to aggregate messages from different partitions into a single channel. Not just for the high level consumer, but for #366 it would be a problem as well.

@eapache
Copy link
Contributor

eapache commented Mar 31, 2015

I imagined #366 returning a []PartitionConsumer, not some magic new object.

@wvanbergen
Copy link
Contributor Author

Rewrote this based on the latest master (which has Coordinator), and changed the implementation to be more in line with GetOffset.

The questions around API design are still outstanding.

@wvanbergen
Copy link
Contributor Author

W.r.t. API design: I think we either need to move towards the model of having an integrated high-level consumer, or stick the the model of a low-level consumer with utility methods for now.

Moving towards a higher level consumer (using ZK, and/or later using the new APIs that kafka 0.8.3 or 0.9 will offer) would be in line with the JVM library. It would involve adding a new type (e.g. Consumergroup), that should expose the offset management methods.

If we are not ready to go down that path yet, I suggest we expose these methods as is on either the Client or Consumer type, so people can use them as they see fit on top of the low-level consumer we have now.

@wvanbergen
Copy link
Contributor Author

Reviving this. My preferred approach:

  1. Update the implementation to batch calls together in a single request together. We may need to update the method signature to indicate that a commit call doesn't necessarily mean that the committed value has been flushed. (Client.Close() will always do a final flush, and maybe we want to add FlushCommits() to trigger one manually?)
  2. As a separate PR, expose this on Consumer/PartitionConsumer.

@eapache
Copy link
Contributor

eapache commented May 1, 2015

I've been thinking more about where this code should go, and I maintain that the client is the wrong spot for it, but I'm coming to agree that the consumer (or partition-consumer) is also wrong.

There's actually going to be a fair bit of code involved in proper offset management, handling flushes per-time and per-offset, handling and rebatching retries when leadership moves. It will basically be a somewhat-simplified producer (I actually played with just adapting the producer code, but there are enough differences to make that impractical).

As such, I'm coming to the conclusion that it should just be its own separate object entirely (OffsetManager or something) that handles all that logic. A preliminary API might be something like:

  • NewOffsetManager(client, consumerGroup)
  • Offset(topic, partition) (int64, error)
  • Metadata(topic, partition) (string, error)
  • Commit(topic, partition, offset)
  • SetMetadata(topic, partition, string)
  • Flush()
  • Errors() <-chan error

Does this make sense at all or am I on the wrong track?

@nickbruun
Copy link

Having these "raw" API methods exposed would be great. I understand the point of batching, but having a straight-through low level method available would be a big help until Sarama matures to having proper handling of these, like @eapache suggests. It would also make it possible to implement different strategies depending on the use case.

Just out of curiosity - is there a chance of this being merged as-is?

@wvanbergen
Copy link
Contributor Author

I don't mind adding "raw" building block methods like these. However, if we are going down that path, they should be able to accept offsets for multiple partitions, not just one, so we can use them as building blocks for higher level constructs like the offset manager that uses batching.

@eapache
Copy link
Contributor

eapache commented May 29, 2015

Functionally, all the low-level building blocks are already available; you can call client.Coordinator to get the appropriate broker, and then call broker.CommitOffset to commit the offset to kafka. The wrappers in this PR don't actually add that much in terms of abstraction.

It's also worth noting that the more advanced handling I suggested is actually fairly far along (see the branch https://github.com/Shopify/sarama/tree/offset-manager), I just need to find some time to clean it up and test it.

@nickbruun
Copy link

Ah, right, hadn't considered that approach. Thanks! Can't wait for the offset manager to be done :)

@eapache eapache closed this May 29, 2015
@eapache eapache deleted the offset_management branch May 29, 2015 14:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants