Skip to content

Commit

Permalink
Merge pull request #418 from Shopify/better_offset_range_handling
Browse files Browse the repository at this point in the history
Consumer: check offset before returning ConsumePartition.
  • Loading branch information
Willem van Bergen committed Apr 16, 2015
2 parents e2d70e1 + 8372105 commit 1221a24
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 19 deletions.
30 changes: 18 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,28 @@ func (child *partitionConsumer) dispatch() error {
return nil
}

func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
var time int64
func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
if err != nil {
return err
}
oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
if err != nil {
return err
}

switch offset {
case OffsetNewest, OffsetOldest:
time = offset
default:
if offset < 0 {
return ConfigurationError("Invalid offset")
}
switch {
case offset == OffsetNewest:
child.offset = newestOffset
case offset == OffsetOldest:
child.offset = oldestOffset
case offset >= oldestOffset && offset <= newestOffset:
child.offset = offset
return nil
default:
return ErrOffsetOutOfRange
}

child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time)
return err
return nil
}

func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
Expand Down
142 changes: 135 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func TestConsumerOffsetManual(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

for i := 0; i <= 10; i++ {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
Expand Down Expand Up @@ -60,9 +68,13 @@ func TestConsumerLatestOffset(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponse := new(OffsetResponse)
offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
leader.Returns(offsetResponse)
offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
Expand Down Expand Up @@ -101,6 +113,14 @@ func TestConsumerFunnyOffsets(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
Expand Down Expand Up @@ -152,10 +172,26 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest0 := new(OffsetResponse)
offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
leader0.Returns(offsetResponseNewest0)

offsetResponseOldest0 := new(OffsetResponse)
offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
leader0.Returns(offsetResponseOldest0)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
leader1.Returns(offsetResponseNewest1)

offsetResponseOldest1 := new(OffsetResponse)
offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
leader1.Returns(offsetResponseOldest1)

// we expect to end up (eventually) consuming exactly ten messages on each partition
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
for i := int32(0); i < 2; i++ {
consumer, err := master.ConsumePartition("my_topic", i, 0)
if err != nil {
t.Error(err)
}
Expand All @@ -179,7 +215,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
}
safeClose(t, consumer)
wg.Done()
}(int32(i), consumer)
}(i, consumer)
}

// leader0 provides first four messages on partition 0
Expand Down Expand Up @@ -273,6 +309,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest0 := new(OffsetResponse)
offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest0)

offsetResponseOldest0 := new(OffsetResponse)
offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest0)

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -282,6 +326,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

offsetResponseNewest1 := new(OffsetResponse)
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
leader.Returns(offsetResponseNewest1)

offsetResponseOldest1 := new(OffsetResponse)
offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
leader.Returns(offsetResponseOldest1)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -301,11 +353,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()
tmp := newMockBroker(t, 3)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
Expand All @@ -317,17 +371,44 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
t.Fatal(err)
}

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
leader.Returns(offsetResponseOldest)

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

offsetResponseNewest = new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
tmp.Returns(offsetResponseNewest)

offsetResponseOldest = new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
tmp.Returns(offsetResponseOldest)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

//redirect partition 1 back to main leader
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
tmp.Returns(fetchResponse)
metadataResponse = new(MetadataResponse)
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
time.Sleep(5 * time.Millisecond)

// now send one message to each partition to make sure everything is primed
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
fetchResponse.AddError("my_topic", 1, ErrNoError)
leader.Returns(fetchResponse)
Expand All @@ -339,6 +420,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
leader.Returns(fetchResponse)
<-c1.Messages()

// bounce the broker
leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)

Expand All @@ -365,6 +447,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
// send it back to the same broker
seedBroker.Returns(metadataResponse)

time.Sleep(5 * time.Millisecond)

select {
case <-c0.Messages():
case <-c1.Messages():
Expand All @@ -384,6 +468,50 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
}()
wg.Wait()
safeClose(t, master)
tmp.Close()
}

func TestConsumerOffsetOutOfRange(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
seedBroker.Close()

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Returns(offsetResponseNewest)
leader.Returns(offsetResponseOldest)
if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
}

leader.Close()
safeClose(t, master)
}

// This example has the simplest use case of the consumer. It simply
Expand Down
25 changes: 25 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package sarama

import (
"math"
"testing"
)

func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
checkKafkaAvailability(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, err := consumer.ConsumePartition("single_partition", 0, -10); err != ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

if _, err := consumer.ConsumePartition("single_partition", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
t.Error("Expected ErrOffsetOutOfRange, got:", err)
}

safeClose(t, consumer)
}
1 change: 1 addition & 0 deletions mockbroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
if err != nil {
t.Fatal(err)
}
Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 1221a24

Please sign in to comment.