Skip to content

Commit

Permalink
Merge pull request #585 from mailgun/maxim/offset_manager
Browse files Browse the repository at this point in the history
Fix offset_manager for Kafka 0.9.0.0
  • Loading branch information
wvanbergen committed Dec 13, 2015
2 parents 87ec8d7 + 1e5cc31 commit 1fdcdc2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 11 deletions.
9 changes: 0 additions & 9 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package sarama

import (
"os"
"testing"
)

func TestFuncOffsetManager(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
if os.Getenv("KAFKA_VERSION") == "0.9.0.0" {
t.Skip("Offset manager is broken with kafka 0.9 at the moment.")
}

setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -24,10 +19,6 @@ func TestFuncOffsetManager(t *testing.T) {
t.Fatal(err)
}

if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
}

pom1, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
Expand Down
7 changes: 5 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

// Offset Manager

const groupGenerationUndefined = -1

// OffsetManager uses Kafka to store and fetch consumed partition offsets.
type OffsetManager interface {
// ManagePartition creates a PartitionOffsetManager on the given topic/partition.
Expand Down Expand Up @@ -476,8 +478,9 @@ func (bom *brokerOffsetManager) flushToBroker() {

func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: groupGenerationUndefined,
}

for s := range bom.subscriptions {
Expand Down

0 comments on commit 1fdcdc2

Please sign in to comment.