diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 7cc9ba8e9..b759669c0 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -1,16 +1,11 @@ package sarama import ( - "os" "testing" ) func TestFuncOffsetManager(t *testing.T) { checkKafkaVersion(t, "0.8.2") - if os.Getenv("KAFKA_VERSION") == "0.9.0.0" { - t.Skip("Offset manager is broken with kafka 0.9 at the moment.") - } - setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -24,10 +19,6 @@ func TestFuncOffsetManager(t *testing.T) { t.Fatal(err) } - if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition { - t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err) - } - pom1, err := offsetManager.ManagePartition("test.1", 0) if err != nil { t.Fatal(err) diff --git a/offset_manager.go b/offset_manager.go index 880d495f7..550739471 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -7,6 +7,8 @@ import ( // Offset Manager +const groupGenerationUndefined = -1 + // OffsetManager uses Kafka to store and fetch consumed partition offsets. type OffsetManager interface { // ManagePartition creates a PartitionOffsetManager on the given topic/partition. @@ -476,8 +478,9 @@ func (bom *brokerOffsetManager) flushToBroker() { func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest { r := &OffsetCommitRequest{ - Version: 1, - ConsumerGroup: bom.parent.group, + Version: 1, + ConsumerGroup: bom.parent.group, + ConsumerGroupGeneration: groupGenerationUndefined, } for s := range bom.subscriptions {