Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initial offset management implementation. #379

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,98 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
return nil
}

func (client *client) CommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error {
if client.Closed() {
return ErrClosedClient
}

if err := client.commitOffset(consumerGroup, topic, partitionID, offset, metadata); err != nil {
Logger.Printf("Error committing offset for %s: %s. Trying again...", consumerGroup, err)
if err := client.RefreshCoordinator(consumerGroup); err != nil {
return err
}

return client.commitOffset(consumerGroup, topic, partitionID, offset, metadata)
}

return nil
}

func (client *client) commitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error {
var err error

broker, err := client.Coordinator(consumerGroup)
if err != nil {
return err
}

offsetCommitRequest := new(OffsetCommitRequest)
offsetCommitRequest.Version = 1
offsetCommitRequest.ConsumerGroup = consumerGroup
offsetCommitRequest.AddBlock(topic, partitionID, offset, ReceiveTime, metadata)

response, err := broker.CommitOffset(offsetCommitRequest)
if err != nil {
_ = broker.Close()
return err
}

err = response.Errors[topic][partitionID]
switch err {
case ErrNoError:
return nil
default:
return err
}
}

func (client *client) FetchOffset(consumerGroup string, topic string, partitionID int32) (int64, string, error) {
if client.Closed() {
return -1, "", ErrClosedClient
}

offset, metadata, err := client.fetchOffset(consumerGroup, topic, partitionID)
if err != nil {
Logger.Printf("Error fetching offset for %s: %s. Trying again...", consumerGroup, err)
if err := client.RefreshCoordinator(consumerGroup); err != nil {
return -1, "", err
}

return client.fetchOffset(consumerGroup, topic, partitionID)
}

return offset, metadata, nil
}

func (client *client) fetchOffset(consumerGroup string, topic string, partitionID int32) (int64, string, error) {
var err error

broker, err := client.Coordinator(consumerGroup)
if err != nil {
return -1, "", err
}

offsetFetchRequest := new(OffsetFetchRequest)
offsetFetchRequest.Version = 1
offsetFetchRequest.ConsumerGroup = consumerGroup
offsetFetchRequest.AddPartition(topic, partitionID)

response, err := broker.FetchOffset(offsetFetchRequest)

if err != nil {
_ = broker.Close()
return -1, "", err
}

block := response.Blocks[topic][partitionID]
switch block.Err {
case ErrNoError:
return block.Offset, block.Metadata, nil
default:
return -1, "", err
}
}

// private broker management helpers

// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
Expand Down
29 changes: 29 additions & 0 deletions functional_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func TestFuncClientCoordinator(t *testing.T) {

safeClose(t, client)
}

func TestFuncClientOffsetManagement(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
checkKafkaAvailability(t)

c, err := NewClient(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if err := c.(*client).CommitOffset("testing_123", "multi_partition", 1, 123, "Hello world"); err != nil {
t.Fatal(err)
}

offset, metadata, err := c.(*client).FetchOffset("testing_123", "multi_partition", 1)
if err != nil {
t.Fatal(err)
}

if offset != 123 {
t.Error("Expected offset 123, got", offset)
}

if metadata != "Hello world" {
t.Errorf("Expected metadata 'Hello world', got '%s'", metadata)
}

safeClose(t, c)
}