Skip to content

Commit

Permalink
Use a ticker instead of a timer to detect timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
wmille committed Aug 22, 2017
1 parent 87e4033 commit 9112d76
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 64 deletions.
34 changes: 15 additions & 19 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,24 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to process
// for the user. If writing to the Messages channel takes longer than this,
// that partition will stop fetching more messages until it can proceed again.
// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
MaxProcessingTime time.Duration

// Whether or not to use the fast checker. The fast checker uses a
// ticker instead of a timer to implement the timeout functionality in
// (*partitionConsumer).responseFeeder.
// If a message is not written to the Messages channel between two ticks
// of the fast checker then a timeout is detected.
// Using the fast checker should typically result in many fewer calls to
// Timer functions resulting in a significant performance improvement if
// many messages are being sent and timeouts are infrequent.
// The disadvantage of using the fast checker is that timeouts will be
// less accurate. That is, the effective timeout could be between
// `MaxProcessingTime` and `2 * MaxProcessingTime`. For example, if
// `MaxProcessingTime` is 100ms then a delay of 180ms between two
// messages being sent may not be recognized as a timeout.
UseFastChecker bool
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration

// Return specifies what channels will be populated. If they are set to true,
// you must read from them to prevent deadlock.
Expand Down Expand Up @@ -292,7 +289,6 @@ func NewConfig() *Config {
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.UseFastChecker = false
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
Expand Down
46 changes: 1 addition & 45 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
broker0.Close()
}

func TestConsumerFastCheckerOff(t *testing.T) {
func TestConsumerExpiryTicker(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
Expand All @@ -822,50 +822,6 @@ func TestConsumerFastCheckerOff(t *testing.T) {

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.UseFastChecker = false
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

// Then: messages with offsets 1 through 8 are read
for i := 1; i <= 8; i++ {
assertMessageOffset(t, <-consumer.Messages(), int64(i))
time.Sleep(2 * time.Millisecond)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func TestConsumerFastCheckerOn(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
for i := 1; i <= 8; i++ {
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
}
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 1),
"FetchRequest": NewMockSequence(fetchResponse1),
})

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.UseFastChecker = true
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand Down

0 comments on commit 9112d76

Please sign in to comment.