Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset manager: make initial offset configurable #520

Merged
merged 3 commits into from
Aug 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ type Config struct {
Offsets struct {
// How frequently to commit updated offsets. Defaults to 10s.
CommitInterval time.Duration

// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
// Defaults to OffsetNewest.
Initial int64
}
}

Expand Down Expand Up @@ -172,6 +176,7 @@ func NewConfig() *Config {
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 10 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest

c.ChannelBufferSize = 256

Expand Down Expand Up @@ -273,6 +278,9 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")

}

// validate misc shared values
Expand Down
51 changes: 51 additions & 0 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sarama

import (
"testing"
)

func TestFuncOffsetManager(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
if err != nil {
t.Fatal(err)
}

if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
}

pom1, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
}

pom1.MarkOffset(10, "test metadata")
safeClose(t, pom1)

pom2, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
}

offset, metadata := pom2.NextOffset()

if offset != 10+1 {
t.Errorf("Expected the next offset to be 11, found %d.", offset)
}
if metadata != "test metadata" {
t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
}

safeClose(t, pom2)
safeClose(t, offsetManager)
safeClose(t, client)
}
2 changes: 1 addition & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func checkKafkaAvailability(t testing.TB) {
func checkKafkaVersion(t testing.TB, requiredVersion string) {
kafkaVersion := os.Getenv("KAFKA_VERSION")
if kafkaVersion == "" {
t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
} else {
available := parseKafkaVersion(kafkaVersion)
required := parseKafkaVersion(requiredVersion)
Expand Down
47 changes: 34 additions & 13 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,49 @@ func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
delete(om.boms, bom.broker)
}

func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
om.lock.Lock()
defer om.lock.Unlock()

delete(om.poms[pom.topic], pom.partition)
if len(om.poms[pom.topic]) == 0 {
delete(om.poms, pom.topic)
}
}

// Partition Offset Manager

// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
// out of scope.
type PartitionOffsetManager interface {
// Offset returns the last offset that was marked as processed and associated metadata according to the manager;
// this value has not necessarily been flushed to the cluster yet. If you want to resume a partition consumer
// from where it left off, remember that you have to increment the offset by one so the partition consumer will
// start at the next message. This prevents the last committed message from being processed twice.
Offset() (int64, string)

// SetOffset sets the offset and metadata according to the manager; this value (or a subsequent update)
// will eventually be flushed to the cluster based on configuration. You should only set the offset of
// messages that have been completely processed.
SetOffset(offset int64, metadata string)
// NextOffset returns the next offset that should be consumed for the managed partition, accompanied
// by metadata which can be used to reconstruct the state of the partition consumer when it resumes.
// NextOffset() will return `config.Consumer.Offsets.Initial` and an empty metadata string if no
// offset was committed for this partition yet.
NextOffset() (int64, string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have named arguments as MarkOffset does below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are return values, not arguments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right.

You can name return values though; might be a good idea here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do that for any of our interfaces, so for consistency I'll leave it at just types.


// MarkOffset marks the provided offset as processed, alongside a metadata string that represents
// the state of the partition consumer at that point in time. The metadata string can be used by
// another consumer to restore that state, so it can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should put a blank line before the note

// for efficiency reasons, and it may never be committed if your application crashes. This means that
// you may end up processing the same message twice, and your processing should ideally be idempotent.
MarkOffset(offset int64, metadata string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the name change is due to the fact that we would resume from the offset after this one, right?

One point of confusion here is that the name (and docstring) imply that every consumed offset must be marked -- "mark as done" vs. "save the last offset"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you don't necessarily have to do that, I think that is the best practice to use this API. We implement batching / commit intervals for you, so you don't have to worry about overloading the broker by calling this function often.


// Errors returns a read channel of errors that occur during offset management, if enabled. By default,
// errors are logged and not returned over this channel. If you want to implement any custom error
// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
Errors() <-chan *ConsumerError

// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
// after which you should wait until the 'errors' channel has been drained and closed.
// It is required to call this function, or Close before a consumer object passes out of scope,
// as it will otherwise leak memory. You must call this before calling Close on the underlying
// client.
AsyncClose()

// Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
// (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
// leak memory. You must call this before calling Close on the underlying client.
Expand Down Expand Up @@ -205,6 +221,7 @@ func (pom *partitionOffsetManager) mainLoop() {
}
pom.parent.unrefBrokerOffsetManager(pom.broker)
}
pom.parent.abandonPartitionOffsetManager(pom)
close(pom.errors)
return
}
Expand Down Expand Up @@ -290,7 +307,7 @@ func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
return pom.errors
}

func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()

Expand All @@ -313,11 +330,15 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string
}
}

func (pom *partitionOffsetManager) Offset() (int64, string) {
func (pom *partitionOffsetManager) NextOffset() (int64, string) {
pom.lock.Lock()
defer pom.lock.Unlock()

return pom.offset, pom.metadata
if pom.offset >= 0 {
return pom.offset + 1, pom.metadata
} else {
return pom.parent.conf.Consumer.Offsets.Initial, ""
}
}

func (pom *partitionOffsetManager) AsyncClose() {
Expand Down
54 changes: 38 additions & 16 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func initOffsetManager(t *testing.T) (om OffsetManager,
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
coordinator *mockBroker) PartitionOffsetManager {
coordinator *mockBroker, initialOffset int64, metadata string) PartitionOffsetManager {

fetchResponse := new(OffsetFetchResponse)
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
Err: ErrNoError,
Offset: 5,
Metadata: "test_meta",
Offset: initialOffset,
Metadata: metadata,
})
coordinator.Returns(fetchResponse)

Expand Down Expand Up @@ -162,12 +162,34 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
safeClose(t, testClient)
}

func TestPartitionOffsetManagerOffset(t *testing.T) {
func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Initial = OffsetOldest

// Kafka returns -1 if no offset has been stored for this partition yet.
pom := initPartitionOffsetManager(t, om, coordinator, -1, "")

offset, meta := pom.NextOffset()
if offset != OffsetOldest {
t.Errorf("Expected offset 5. Actual: %v", offset)
}
if meta != "" {
t.Errorf("Expected metadata to be empty. Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
broker.Close()
coordinator.Close()
safeClose(t, testClient)
}

func TestPartitionOffsetManagerNextOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")

offset, meta := pom.Offset()
if offset != 5 {
offset, meta := pom.NextOffset()
if offset != 6 {
t.Errorf("Expected offset 5. Actual: %v", offset)
}
if meta != "test_meta" {
Expand All @@ -181,18 +203,18 @@ func TestPartitionOffsetManagerOffset(t *testing.T) {
safeClose(t, testClient)
}

func TestPartitionOffsetManagerSetOffset(t *testing.T) {
func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
coordinator.Returns(ocResponse)

pom.SetOffset(100, "modified_meta")
offset, meta := pom.Offset()
pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 100 {
if offset != 101 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
Expand All @@ -208,7 +230,7 @@ func TestPartitionOffsetManagerSetOffset(t *testing.T) {

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")

// Error on one partition
ocResponse := new(OffsetCommitResponse)
Expand Down Expand Up @@ -265,7 +287,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
ocResponse5.AddError("my_topic", 0, ErrNoError)
newCoordinator.Returns(ocResponse5)

pom.SetOffset(100, "modified_meta")
pom.MarkOffset(100, "modified_meta")

err := pom.Close()
if err != nil {
Expand All @@ -282,7 +304,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
// Test of recovery from abort
func TestAbortPartitionOffsetManager(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")

// this triggers an error in the CommitOffset request,
// which leads to the abort call
Expand All @@ -300,7 +322,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) {
ocResponse.AddError("my_topic", 0, ErrNoError)
newCoordinator.Returns(ocResponse)

pom.SetOffset(100, "modified_meta")
pom.MarkOffset(100, "modified_meta")

safeClose(t, pom)
safeClose(t, om)
Expand Down