Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional fast checker to (*partitionConsumer).responseFeeder #933

Merged
merged 3 commits into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,23 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to process
// for the user. If writing to the Messages channel takes longer than this,
// that partition will stop fetching more messages until it can proceed again.
// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration

// Return specifies what channels will be populated. If they are set to true,
Expand Down
36 changes: 19 additions & 17 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,35 +440,37 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
expireTimedOut := false
msgSent := false

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)

for i, msg := range msgs {
if !expiryTimer.Stop() && !expireTimedOut {
// expiryTimer was expired; clear out the waiting msg
<-expiryTimer.C
}
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
expireTimedOut = false

messageSelect:
select {
case child.messages <- msg:
case <-expiryTimer.C:
expireTimedOut = true
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
msgSent = true
case <-expiryTicker.C:
if !msgSent {
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
msgSent = false
goto messageSelect
}
child.broker.input <- child
continue feederLoop
}
}

expiryTicker.Stop()
child.broker.acks.Done()
}

Expand Down
42 changes: 42 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,48 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
broker0.Close()
}

func TestConsumerExpiryTicker(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
for i := 1; i <= 8; i++ {
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
}
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 1),
"FetchRequest": NewMockSequence(fetchResponse1),
})

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

// Then: messages with offsets 1 through 8 are read
for i := 1; i <= 8; i++ {
assertMessageOffset(t, <-consumer.Messages(), int64(i))
time.Sleep(2 * time.Millisecond)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
Expand Down
2 changes: 1 addition & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestMessageEncoding(t *testing.T) {

message.Value = []byte{}
message.Codec = CompressionGZIP
if runtime.Version() == "go1.8" {
if runtime.Version() == "go1.8" || runtime.Version() == "go1.8.1" {
testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
} else {
testEncodable(t, "empty gzip", &message, emptyGzipMessage)
Expand Down