Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont require all consumers drained #485

Merged
merged 3 commits into from
Aug 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel
// takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the
// Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
MaxProcessingTime time.Duration

// Return specifies what channels will be populated. If they are set to true, you must read from
// them to prevent deadlock.
Return struct {
Expand Down Expand Up @@ -147,6 +152,7 @@ func NewConfig() *Config {
c.Consumer.Fetch.Default = 32768
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false

c.ChannelBufferSize = 256
Expand Down Expand Up @@ -239,7 +245,9 @@ func (c *Config) Validate() error {
case c.Consumer.Fetch.Max < 0:
return ConfigurationError("Consumer.Fetch.Max must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Consumer.MaxWaitTime must be > 1ms")
return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
case c.Consumer.MaxProcessingTime <= 0:
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
}
Expand Down
104 changes: 65 additions & 39 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -271,13 +272,15 @@ type partitionConsumer struct {
feeder chan *FetchResponse

trigger, dying chan none
dispatchReason error
responseResult error

fetchSize int32
offset int64
highWaterMarkOffset int64
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, all the configuration errors (see config.go above) are capitalized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I've submitted a PR to fix them :)


func (child *partitionConsumer) sendError(err error) {
cErr := &ConsumerError{
Topic: child.topic,
Expand Down Expand Up @@ -401,23 +404,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
}

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage

feederLoop:
for response := range child.feeder {
switch err := child.handleResponse(response); err {
case nil:
break
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
child.sendError(err)
child.AsyncClose()
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dispatchReason = err
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dispatchReason = err
msgs, child.responseResult = child.parseResponse(response)

for i, msg := range msgs {
select {
case child.messages <- msg:
case <-time.After(child.conf.Consumer.MaxProcessingTime):
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
}
}

child.broker.acks.Done()
Expand All @@ -427,14 +431,14 @@ func (child *partitionConsumer) responseFeeder() {
close(child.errors)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
return nil, ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
return nil, block.Err
}

if len(block.MsgSet.Messages) == 0 {
Expand All @@ -453,16 +457,16 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
}
}

return nil
return nil, nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
prelude := true
var messages []*ConsumerMessage
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
Expand All @@ -472,14 +476,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
})
child.offset = msg.Offset + 1
} else {
incomplete = true
Expand All @@ -488,10 +491,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
if incomplete || len(messages) == 0 {
return nil, ErrIncompleteResponse
}
return nil
return messages, nil
}

// brokerConsumer
Expand Down Expand Up @@ -569,7 +572,10 @@ func (bc *brokerConsumer) subscriptionConsumer() {

// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptionCache(newSubscriptions)
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
Expand All @@ -591,27 +597,47 @@ func (bc *brokerConsumer) subscriptionConsumer() {
child.feeder <- response
}
bc.acks.Wait()
bc.handleResponses()
}
}

func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
// take new subscriptions, and abandon subscriptions that have been closed
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
select {
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
if child.dispatchReason != nil {
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
break
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(result)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(result)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
child.dispatchReason = nil
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
}
Expand Down
1 change: 1 addition & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)
leader.Returns(fetchResponse)

safeClose(t, c1)
safeClose(t, c0)
Expand Down