Skip to content

Commit

Permalink
Consumer offset management: add Client.CommitOffset and Client.FetchO…
Browse files Browse the repository at this point in the history
…ffset
  • Loading branch information
wvanbergen committed Apr 11, 2015
1 parent fb1ac37 commit 16f46ef
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
92 changes: 92 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,98 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
return nil
}

func (client *client) CommitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error {
if client.Closed() {
return ErrClosedClient
}

if err := client.commitOffset(consumerGroup, topic, partitionID, offset, metadata); err != nil {
Logger.Printf("Error committing offset for %s: %s. Trying again...", consumerGroup, err)
if err := client.RefreshCoordinator(consumerGroup); err != nil {
return err
}

return client.commitOffset(consumerGroup, topic, partitionID, offset, metadata)
}

return nil
}

func (client *client) commitOffset(consumerGroup string, topic string, partitionID int32, offset int64, metadata string) error {
var err error

broker, err := client.Coordinator(consumerGroup)
if err != nil {
return err
}

offsetCommitRequest := new(OffsetCommitRequest)
offsetCommitRequest.Version = 1
offsetCommitRequest.ConsumerGroup = consumerGroup
offsetCommitRequest.AddBlock(topic, partitionID, offset, ReceiveTime, metadata)

response, err := broker.CommitOffset(offsetCommitRequest)
if err != nil {
_ = broker.Close()
return err
}

err = response.Errors[topic][partitionID]
switch err {
case ErrNoError:
return nil
default:
return err
}
}

func (client *client) FetchOffset(consumerGroup string, topic string, partitionID int32) (int64, string, error) {
if client.Closed() {
return -1, "", ErrClosedClient
}

offset, metadata, err := client.fetchOffset(consumerGroup, topic, partitionID)
if err != nil {
Logger.Printf("Error fetching offset for %s: %s. Trying again...", consumerGroup, err)
if err := client.RefreshCoordinator(consumerGroup); err != nil {
return -1, "", err
}

return client.fetchOffset(consumerGroup, topic, partitionID)
}

return offset, metadata, nil
}

func (client *client) fetchOffset(consumerGroup string, topic string, partitionID int32) (int64, string, error) {
var err error

broker, err := client.Coordinator(consumerGroup)
if err != nil {
return -1, "", err
}

offsetFetchRequest := new(OffsetFetchRequest)
offsetFetchRequest.Version = 1
offsetFetchRequest.ConsumerGroup = consumerGroup
offsetFetchRequest.AddPartition(topic, partitionID)

response, err := broker.FetchOffset(offsetFetchRequest)

if err != nil {
_ = broker.Close()
return -1, "", err
}

block := response.Blocks[topic][partitionID]
switch block.Err {
case ErrNoError:
return block.Offset, block.Metadata, nil
default:
return -1, "", err
}
}

// private broker management helpers

// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
Expand Down
29 changes: 29 additions & 0 deletions functional_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func TestFuncClientCoordinator(t *testing.T) {

safeClose(t, client)
}

func TestFuncClientOffsetManagement(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
checkKafkaAvailability(t)

c, err := NewClient(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if err := c.(*client).CommitOffset("testing_123", "multi_partition", 1, 123, "Hello world"); err != nil {
t.Fatal(err)
}

offset, metadata, err := c.(*client).FetchOffset("testing_123", "multi_partition", 1)
if err != nil {
t.Fatal(err)
}

if offset != 123 {
t.Error("Expected offset 123, got", offset)
}

if metadata != "Hello world" {
t.Errorf("Expected metadata 'Hello world', got '%s'", metadata)
}

safeClose(t, c)
}

0 comments on commit 16f46ef

Please sign in to comment.