diff --git a/config.go b/config.go index 264dfa0af..653db45f0 100644 --- a/config.go +++ b/config.go @@ -105,6 +105,11 @@ type Config struct { // Equivalent to the JVM's `fetch.wait.max.ms`. MaxWaitTime time.Duration + // The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel + // takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the + // Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. + MaxProcessingTime time.Duration + // Return specifies what channels will be populated. If they are set to true, you must read from // them to prevent deadlock. Return struct { @@ -147,6 +152,7 @@ func NewConfig() *Config { c.Consumer.Fetch.Default = 32768 c.Consumer.Retry.Backoff = 2 * time.Second c.Consumer.MaxWaitTime = 250 * time.Millisecond + c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false c.ChannelBufferSize = 256 @@ -239,7 +245,9 @@ func (c *Config) Validate() error { case c.Consumer.Fetch.Max < 0: return ConfigurationError("Consumer.Fetch.Max must be >= 0") case c.Consumer.MaxWaitTime < 1*time.Millisecond: - return ConfigurationError("Consumer.MaxWaitTime must be > 1ms") + return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms") + case c.Consumer.MaxProcessingTime <= 0: + return ConfigurationError("Consumer.MaxProcessingTime must be > 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Consumer.Retry.Backoff must be >= 0") } diff --git a/consumer.go b/consumer.go index 8a3ca6f02..43ce3b21b 100644 --- a/consumer.go +++ b/consumer.go @@ -1,6 +1,7 @@ package sarama import ( + "errors" "fmt" "sync" "sync/atomic" @@ -271,13 +272,15 @@ type partitionConsumer struct { feeder chan *FetchResponse trigger, dying chan none - dispatchReason error + responseResult error fetchSize int32 offset int64 highWaterMarkOffset int64 } +var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing + func (child *partitionConsumer) sendError(err error) { cErr := &ConsumerError{ Topic: child.topic, @@ -401,23 +404,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { } func (child *partitionConsumer) responseFeeder() { + var msgs []*ConsumerMessage + +feederLoop: for response := range child.feeder { - switch err := child.handleResponse(response); err { - case nil: - break - case ErrOffsetOutOfRange: - // there's no point in retrying this it will just fail the same way again - // so shut it down and force the user to choose what to do - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err) - child.sendError(err) - child.AsyncClose() - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // these three are not fatal errors, but do require redispatching - child.dispatchReason = err - default: - // dunno, tell the user and try redispatching - child.sendError(err) - child.dispatchReason = err + msgs, child.responseResult = child.parseResponse(response) + + for i, msg := range msgs { + select { + case child.messages <- msg: + case <-time.After(child.conf.Consumer.MaxProcessingTime): + child.responseResult = errTimedOut + child.broker.acks.Done() + for _, msg = range msgs[i:] { + child.messages <- msg + } + child.broker.input <- child + continue feederLoop + } } child.broker.acks.Done() @@ -427,14 +431,14 @@ func (child *partitionConsumer) responseFeeder() { close(child.errors) } -func (child *partitionConsumer) handleResponse(response *FetchResponse) error { +func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { block := response.GetBlock(child.topic, child.partition) if block == nil { - return ErrIncompleteResponse + return nil, ErrIncompleteResponse } if block.Err != ErrNoError { - return block.Err + return nil, block.Err } if len(block.MsgSet.Messages) == 0 { @@ -453,7 +457,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { } } - return nil + return nil, nil } // we got messages, reset our fetch size in case it was increased for a previous request @@ -461,8 +465,8 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) incomplete := false - atLeastOne := false prelude := true + var messages []*ConsumerMessage for _, msgBlock := range block.MsgSet.Messages { for _, msg := range msgBlock.Messages() { @@ -472,14 +476,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { prelude = false if msg.Offset >= child.offset { - atLeastOne = true - child.messages <- &ConsumerMessage{ + messages = append(messages, &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, - } + }) child.offset = msg.Offset + 1 } else { incomplete = true @@ -488,10 +491,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { } - if incomplete || !atLeastOne { - return ErrIncompleteResponse + if incomplete || len(messages) == 0 { + return nil, ErrIncompleteResponse } - return nil + return messages, nil } // brokerConsumer @@ -569,7 +572,10 @@ func (bc *brokerConsumer) subscriptionConsumer() { // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available for newSubscriptions := range bc.newSubscriptions { - bc.updateSubscriptionCache(newSubscriptions) + for _, child := range newSubscriptions { + bc.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + } if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. @@ -591,16 +597,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { child.feeder <- response } bc.acks.Wait() + bc.handleResponses() } } -func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) { - // take new subscriptions, and abandon subscriptions that have been closed - for _, child := range newSubscriptions { - bc.subscriptions[child] = none{} - Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - } - +func (bc *brokerConsumer) handleResponses() { + // handles the response codes left for us by our subscriptions, and abandons ones that have been closed for child := range bc.subscriptions { select { case <-child.dying: @@ -608,10 +610,34 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC close(child.trigger) delete(bc.subscriptions, child) default: - if child.dispatchReason != nil { + result := child.responseResult + child.responseResult = nil + + switch result { + case nil: + break + case errTimedOut: + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", + bc.broker.ID(), child.topic, child.partition) + delete(bc.subscriptions, child) + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // shut it down and force the user to choose what to do + child.sendError(result) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) + close(child.trigger) + delete(bc.subscriptions, child) + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // not an error, but does need redispatching + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, result) + child.trigger <- none{} + delete(bc.subscriptions, child) + default: + // dunno, tell the user and try redispatching + child.sendError(result) Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.dispatchReason) - child.dispatchReason = nil + bc.broker.ID(), child.topic, child.partition, result) child.trigger <- none{} delete(bc.subscriptions, child) } diff --git a/consumer_test.go b/consumer_test.go index cad709b53..6617f149e 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) { fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) leader.Returns(fetchResponse) + leader.Returns(fetchResponse) safeClose(t, c1) safeClose(t, c0)