Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High water mark offset support to the consumer #339

Merged
merged 1 commit into from
Apr 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -255,6 +256,11 @@ type PartitionConsumer interface {
// errors are logged and not returned over this channel. If you want to implement any custom errpr
// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
Errors() <-chan *ConsumerError

// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
// be used for the next message that will be produced. You can use this to determine how far behind
// the processing is.
HighWaterMarkOffset() int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the kafka docs have "watermark" as one word, so I guess HighWatermarkOffset would be more consistent, but YMMV

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I guess that's my fault for misnaming the variable in FetchResponseBlock :(

Add it to the list of breaking changes I guess...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, grammatically I feel my capitalization closer to high-water mark instead of the incorrect high watermark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I have a problem with it being high-water mark offset; I'd have preferred high-water mark or high-water offset instead of this Frankensteinian composition. But that would be deviating from the protocol spec too much. Oh well :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for a different opinion, wikipedia doesn't even hyphenate, it just uses three distinct words ("high water mark") everywhere. So I guess this issue is contentious :)

This is fine as-is (and is consistent with FetchResponseBlock). We can revisit if anybody complains.

}

type partitionConsumer struct {
Expand All @@ -268,8 +274,9 @@ type partitionConsumer struct {
errors chan *ConsumerError
trigger, dying chan none

fetchSize int32
offset int64
fetchSize int32
offset int64
highWaterMarkOffset int64
}

func (child *partitionConsumer) sendError(err error) {
Expand Down Expand Up @@ -391,6 +398,10 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&child.highWaterMarkOffset)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
Expand Down Expand Up @@ -422,6 +433,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
Expand Down
26 changes: 20 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestConsumerOffsetManual(t *testing.T) {
leader.Close()
}

func TestConsumerLatestOffset(t *testing.T) {
func TestConsumerOffsetNewest(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand All @@ -69,15 +69,17 @@ func TestConsumerLatestOffset(t *testing.T) {
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
block := fetchResponse.GetBlock("my_topic", 0)
block.HighWaterMarkOffset = 14
leader.Returns(fetchResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
Expand All @@ -91,12 +93,24 @@ func TestConsumerLatestOffset(t *testing.T) {
t.Fatal(err)
}

msg := <-consumer.Messages()

// we deliver one message, so it should be one higher than we return in the OffsetResponse
if msg.Offset != 10 {
t.Error("Latest message offset not fetched correctly:", msg.Offset)
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
t.Errorf("Expected high water mark offset 14, found %d", hwmo)
}

leader.Close()
safeClose(t, consumer)
safeClose(t, master)

// we deliver one message, so it should be one higher than we return in the OffsetResponse
if consumer.(*partitionConsumer).offset != 0x010102 {
// We deliver one message, so it should be one higher than we return in the OffsetResponse.
// This way it is set correctly for the next FetchRequest.
if consumer.(*partitionConsumer).offset != 11 {
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
}
}
Expand Down
34 changes: 34 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,37 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {

safeClose(t, consumer)
}

func TestConsumerHighWaterMarkOffset(t *testing.T) {
checkKafkaAvailability(t)

p, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, p)

_, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
if err != nil {
t.Fatal(err)
}

c, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)

pc, err := c.ConsumePartition("test.1", 0, OffsetOldest)
if err != nil {
t.Fatal(err)
}

<-pc.Messages()

if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
}

safeClose(t, pc)
}
11 changes: 8 additions & 3 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mocks

import (
"sync"
"sync/atomic"

"github.com/Shopify/sarama"
)
Expand Down Expand Up @@ -175,13 +176,13 @@ type PartitionConsumer struct {
consumed bool
errorsShouldBeDrained bool
messagesShouldBeDrained bool
highWaterMarkOffset int64
}

func (pc *PartitionConsumer) handleExpectations() {
pc.l.Lock()
defer pc.l.Unlock()

var offset int64
for ex := range pc.expectations {
if ex.Err != nil {
pc.errors <- &sarama.ConsumerError{
Expand All @@ -190,11 +191,11 @@ func (pc *PartitionConsumer) handleExpectations() {
Err: ex.Err,
}
} else {
offset++
atomic.AddInt64(&pc.highWaterMarkOffset, 1)

ex.Msg.Topic = pc.topic
ex.Msg.Partition = pc.partition
ex.Msg.Offset = offset
ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)

pc.messages <- ex.Msg
}
Expand Down Expand Up @@ -274,6 +275,10 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
}

///////////////////////////////////////////////////
// Expectation API
///////////////////////////////////////////////////
Expand Down