diff --git a/client.go b/client.go index 5101a891b..f34bce1bc 100644 --- a/client.go +++ b/client.go @@ -345,6 +345,98 @@ func (client *client) RefreshCoordinator(consumerGroup string) error { return nil } +func (client *client) CommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error { + if client.Closed() { + return ErrClosedClient + } + + if err := client.commitOffset(consumerGroup, topic, partitionID, offset, metadata); err != nil { + Logger.Printf("Error committing offset for %s: %s. Trying again...", consumerGroup, err) + if err := client.RefreshCoordinator(consumerGroup); err != nil { + return err + } + + return client.commitOffset(consumerGroup, topic, partitionID, offset, metadata) + } + + return nil +} + +func (client *client) commitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error { + var err error + + broker, err := client.Coordinator(consumerGroup) + if err != nil { + return err + } + + offsetCommitRequest := new(OffsetCommitRequest) + offsetCommitRequest.Version = 1 + offsetCommitRequest.ConsumerGroup = consumerGroup + offsetCommitRequest.AddBlock(topic, partitionID, offset, ReceiveTime, metadata) + + response, err := broker.CommitOffset(offsetCommitRequest) + if err != nil { + _ = broker.Close() + return err + } + + err = response.Errors[topic][partitionID] + switch err { + case ErrNoError: + return nil + default: + return err + } +} + +func (client *client) FetchOffset(consumerGroup string, topic string, partitionID int32) (int64, string, error) { + if client.Closed() { + return -1, "", ErrClosedClient + } + + offset, metadata, err := client.fetchOffset(consumerGroup, topic, partitionID) + if err != nil { + Logger.Printf("Error fetching offset for %s: %s. Trying again...", consumerGroup, err) + if err := client.RefreshCoordinator(consumerGroup); err != nil { + return -1, "", err + } + + return client.fetchOffset(consumerGroup, topic, partitionID) + } + + return offset, metadata, nil +} + +func (client *client) fetchOffset(consumerGroup string, topic string, partitionID int32) (int64, string, error) { + var err error + + broker, err := client.Coordinator(consumerGroup) + if err != nil { + return -1, "", err + } + + offsetFetchRequest := new(OffsetFetchRequest) + offsetFetchRequest.Version = 1 + offsetFetchRequest.ConsumerGroup = consumerGroup + offsetFetchRequest.AddPartition(topic, partitionID) + + response, err := broker.FetchOffset(offsetFetchRequest) + + if err != nil { + _ = broker.Close() + return -1, "", err + } + + block := response.Blocks[topic][partitionID] + switch block.Err { + case ErrNoError: + return block.Offset, block.Metadata, nil + default: + return -1, "", err + } +} + // private broker management helpers // registerBroker makes sure a broker received by a Metadata or Coordinator request is registered diff --git a/functional_client_test.go b/functional_client_test.go index 83c05205c..0fa70863d 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -80,3 +80,32 @@ func TestFuncClientCoordinator(t *testing.T) { safeClose(t, client) } + +func TestFuncClientOffsetManagement(t *testing.T) { + checkKafkaVersion(t, "0.8.2") + checkKafkaAvailability(t) + + c, err := NewClient(kafkaBrokers, nil) + if err != nil { + t.Fatal(err) + } + + if err := c.(*client).CommitOffset("testing_123", "multi_partition", 1, 123, "Hello world"); err != nil { + t.Fatal(err) + } + + offset, metadata, err := c.(*client).FetchOffset("testing_123", "multi_partition", 1) + if err != nil { + t.Fatal(err) + } + + if offset != 123 { + t.Error("Expected offset 123, got", offset) + } + + if metadata != "Hello world" { + t.Errorf("Expected metadata 'Hello world', got '%s'", metadata) + } + + safeClose(t, c) +}