Skip to content

Commit

Permalink
Optionally use a ticker instead of a timer to detect timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
wmille committed Aug 22, 2017
1 parent 7bbb175 commit 87e4033
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 60 deletions.
22 changes: 9 additions & 13 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,20 @@ type Config struct {
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
MaxProcessingTime time.Duration

// The time interval between ticks of the fast checker. A value of 0
// turns off the fast checker.
// If this is set to a non-zero value, then there will be periodic
// checks to see if messages have been written to the Messages channel.
// If a message has not been written to the Messages channel since the
// last tick of the fast checker, then the timer will be set.
// Whether or not to use the fast checker. The fast checker uses a
// ticker instead of a timer to implement the timeout functionality in
// (*partitionConsumer).responseFeeder.
// If a message is not written to the Messages channel between two ticks
// of the fast checker then a timeout is detected.
// Using the fast checker should typically result in many fewer calls to
// Timer functions resulting in a significant performance improvement if
// many messages are being sent and timeouts are infrequent.
// The disadvantage of using the fast checker is that timeouts will be
// less accurate. That is, the effective timeout could be between
// `MaxProcessingTime` and `MaxProcessingTime + FastCheckerInterval`.
// For example, if `MaxProcessingTime` is 100ms and
// `FastCheckerInterval` is 10ms, then a delay of 108ms between two
// `MaxProcessingTime` and `2 * MaxProcessingTime`. For example, if
// `MaxProcessingTime` is 100ms then a delay of 180ms between two
// messages being sent may not be recognized as a timeout.
FastCheckerInterval time.Duration
UseFastChecker bool

// Return specifies what channels will be populated. If they are set to true,
// you must read from them to prevent deadlock.
Expand Down Expand Up @@ -294,7 +292,7 @@ func NewConfig() *Config {
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.FastCheckerInterval = 0
c.Consumer.UseFastChecker = false
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
Expand Down Expand Up @@ -420,8 +418,6 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
case c.Consumer.MaxProcessingTime <= 0:
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.FastCheckerInterval < 0:
return ConfigurationError("Consumer.FastCheckerInterval must be >= 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
Expand Down
59 changes: 14 additions & 45 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,67 +441,36 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
msgSent := false
// Initialize timer without a pending send on its channel
expiryTimer := time.NewTimer(0)
<-expiryTimer.C
expiryTimerSet := false

var fastCheckerChan <-chan (time.Time)
if child.conf.Consumer.FastCheckerInterval > 0 {
fastChecker := time.NewTicker(child.conf.Consumer.FastCheckerInterval)
defer fastChecker.Stop()
fastCheckerChan = fastChecker.C
}

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)

for i, msg := range msgs {
if child.conf.Consumer.FastCheckerInterval <= 0 {
expiryTimerSet = true
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
}

messageSelect:
select {
case child.messages <- msg:
msgSent = true
if expiryTimerSet {
// The timer was set and a message was sent, stop the
// timer and resume using the fast checker
if !expiryTimer.Stop() {
<-expiryTimer.C
case <-expiryTicker.C:
if !msgSent {
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
expiryTimerSet = false
}
// Periodically check if messages have been sent
case <-fastCheckerChan:
if msgSent {
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
msgSent = false
} else if !expiryTimerSet {
// No messages have been sent since the last tick,
// start the timer
expiryTimerSet = true
// If the fast checker is being used, then at least
// the time between two fast checker ticks has already
// passed since the last message was sent.
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime - child.conf.Consumer.FastCheckerInterval)
}
// message has not been sent, return to select statement
goto messageSelect
case <-expiryTimer.C:
expiryTimerSet = false
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
goto messageSelect
}
child.broker.input <- child
continue feederLoop
}
}

expiryTicker.Stop()
child.broker.acks.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func TestConsumerFastCheckerOff(t *testing.T) {

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.FastCheckerInterval = 0
config.Consumer.UseFastChecker = false
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -865,7 +865,7 @@ func TestConsumerFastCheckerOn(t *testing.T) {

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.FastCheckerInterval = 1 * time.Millisecond
config.Consumer.UseFastChecker = true
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand Down

2 comments on commit 87e4033

@nipuntalukdar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it support high level consumer (like the one bundled with Kafka Scala/Java library) where consumers in a consumer group balance/share the partitions?

@eapache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.