From 8a91a50ecdad201cfb69e0136623f345fd1ed3c2 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Mon, 27 Jul 2015 16:38:11 -0400 Subject: [PATCH 1/3] consumer: fix another race pointed out by Maxim Take the previous refactor to its logical conclusion by handling *all* the error logic in the brokerConsumer, not the responseFeeder. This fixes the race to close the dying channel (since the brokerConsumer can just close the trigger instead as it has ownership). At the same time, refactor `updateSubscriptionCache` into `handleResponses`, and inline the "new subscriptions" bit into the main loop; otherwise we end up processing the previous iterations results at the very beginning of the next iteration, rather than at the very end of the current one. --- consumer.go | 61 +++++++++++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/consumer.go b/consumer.go index 8a3ca6f02..20adc862d 100644 --- a/consumer.go +++ b/consumer.go @@ -271,7 +271,7 @@ type partitionConsumer struct { feeder chan *FetchResponse trigger, dying chan none - dispatchReason error + responseResult error fetchSize int32 offset int64 @@ -402,24 +402,7 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { for response := range child.feeder { - switch err := child.handleResponse(response); err { - case nil: - break - case ErrOffsetOutOfRange: - // there's no point in retrying this it will just fail the same way again - // so shut it down and force the user to choose what to do - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err) - child.sendError(err) - child.AsyncClose() - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // these three are not fatal errors, but do require redispatching - child.dispatchReason = err - default: - // dunno, tell the user and try redispatching - child.sendError(err) - child.dispatchReason = err - } - + child.responseResult = child.handleResponse(response) child.broker.acks.Done() } @@ -569,7 +552,10 @@ func (bc *brokerConsumer) subscriptionConsumer() { // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available for newSubscriptions := range bc.newSubscriptions { - bc.updateSubscriptionCache(newSubscriptions) + for _, child := range newSubscriptions { + bc.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + } if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. @@ -591,16 +577,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { child.feeder <- response } bc.acks.Wait() + bc.handleResponses() } } -func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) { - // take new subscriptions, and abandon subscriptions that have been closed - for _, child := range newSubscriptions { - bc.subscriptions[child] = none{} - Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - } - +func (bc *brokerConsumer) handleResponses() { + // handles the response codes left for us by our subscriptions, and abandons ones that have been closed for child := range bc.subscriptions { select { case <-child.dying: @@ -608,13 +590,32 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC close(child.trigger) delete(bc.subscriptions, child) default: - if child.dispatchReason != nil { + switch child.responseResult { + case nil: + break + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // shut it down and force the user to choose what to do + child.sendError(child.responseResult) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult) + close(child.trigger) + delete(bc.subscriptions, child) + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // not an error, but does need redispatching + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, child.responseResult) + child.trigger <- none{} + delete(bc.subscriptions, child) + default: + // dunno, tell the user and try redispatching + child.sendError(child.responseResult) Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.dispatchReason) - child.dispatchReason = nil + bc.broker.ID(), child.topic, child.partition, child.responseResult) child.trigger <- none{} delete(bc.subscriptions, child) } + + child.responseResult = nil } } } From 7e4b74b20d08b433b0bad2892e86900403db21d7 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 15 Jul 2015 13:07:13 -0400 Subject: [PATCH 2/3] Move the consumer's channel send slightly Prep for unblocking consumers that are not being drained --- consumer.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/consumer.go b/consumer.go index 20adc862d..6c7eaf4da 100644 --- a/consumer.go +++ b/consumer.go @@ -401,8 +401,15 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { } func (child *partitionConsumer) responseFeeder() { + var msgs []*ConsumerMessage + for response := range child.feeder { - child.responseResult = child.handleResponse(response) + msgs, child.responseResult = child.parseResponse(response) + + for _, msg := range msgs { + child.messages <- msg + } + child.broker.acks.Done() } @@ -410,14 +417,14 @@ func (child *partitionConsumer) responseFeeder() { close(child.errors) } -func (child *partitionConsumer) handleResponse(response *FetchResponse) error { +func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { block := response.GetBlock(child.topic, child.partition) if block == nil { - return ErrIncompleteResponse + return nil, ErrIncompleteResponse } if block.Err != ErrNoError { - return block.Err + return nil, block.Err } if len(block.MsgSet.Messages) == 0 { @@ -436,7 +443,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { } } - return nil + return nil, nil } // we got messages, reset our fetch size in case it was increased for a previous request @@ -444,8 +451,8 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) incomplete := false - atLeastOne := false prelude := true + var messages []*ConsumerMessage for _, msgBlock := range block.MsgSet.Messages { for _, msg := range msgBlock.Messages() { @@ -455,14 +462,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { prelude = false if msg.Offset >= child.offset { - atLeastOne = true - child.messages <- &ConsumerMessage{ + messages = append(messages, &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, - } + }) child.offset = msg.Offset + 1 } else { incomplete = true @@ -471,10 +477,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { } - if incomplete || !atLeastOne { - return ErrIncompleteResponse + if incomplete || len(messages) == 0 { + return nil, ErrIncompleteResponse } - return nil + return messages, nil } // brokerConsumer From 292f3b0aa1d7adfb715ebef42f7f95a1c8748a5f Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 15 Jul 2015 14:44:18 -0400 Subject: [PATCH 3/3] consumer: don't block on undrained partitions If a partitionConsumer fills up and is not being drained (or is taking a long time) remove its subscription until it can proceed again in order to not block other partitions which may still be making progress. --- config.go | 10 +++++++++- consumer.go | 39 +++++++++++++++++++++++++++++---------- consumer_test.go | 1 + 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/config.go b/config.go index 264dfa0af..653db45f0 100644 --- a/config.go +++ b/config.go @@ -105,6 +105,11 @@ type Config struct { // Equivalent to the JVM's `fetch.wait.max.ms`. MaxWaitTime time.Duration + // The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel + // takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the + // Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms. + MaxProcessingTime time.Duration + // Return specifies what channels will be populated. If they are set to true, you must read from // them to prevent deadlock. Return struct { @@ -147,6 +152,7 @@ func NewConfig() *Config { c.Consumer.Fetch.Default = 32768 c.Consumer.Retry.Backoff = 2 * time.Second c.Consumer.MaxWaitTime = 250 * time.Millisecond + c.Consumer.MaxProcessingTime = 100 * time.Millisecond c.Consumer.Return.Errors = false c.ChannelBufferSize = 256 @@ -239,7 +245,9 @@ func (c *Config) Validate() error { case c.Consumer.Fetch.Max < 0: return ConfigurationError("Consumer.Fetch.Max must be >= 0") case c.Consumer.MaxWaitTime < 1*time.Millisecond: - return ConfigurationError("Consumer.MaxWaitTime must be > 1ms") + return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms") + case c.Consumer.MaxProcessingTime <= 0: + return ConfigurationError("Consumer.MaxProcessingTime must be > 0") case c.Consumer.Retry.Backoff < 0: return ConfigurationError("Consumer.Retry.Backoff must be >= 0") } diff --git a/consumer.go b/consumer.go index 6c7eaf4da..43ce3b21b 100644 --- a/consumer.go +++ b/consumer.go @@ -1,6 +1,7 @@ package sarama import ( + "errors" "fmt" "sync" "sync/atomic" @@ -278,6 +279,8 @@ type partitionConsumer struct { highWaterMarkOffset int64 } +var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing + func (child *partitionConsumer) sendError(err error) { cErr := &ConsumerError{ Topic: child.topic, @@ -403,11 +406,22 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage +feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) - for _, msg := range msgs { - child.messages <- msg + for i, msg := range msgs { + select { + case child.messages <- msg: + case <-time.After(child.conf.Consumer.MaxProcessingTime): + child.responseResult = errTimedOut + child.broker.acks.Done() + for _, msg = range msgs[i:] { + child.messages <- msg + } + child.broker.input <- child + continue feederLoop + } } child.broker.acks.Done() @@ -596,32 +610,37 @@ func (bc *brokerConsumer) handleResponses() { close(child.trigger) delete(bc.subscriptions, child) default: - switch child.responseResult { + result := child.responseResult + child.responseResult = nil + + switch result { case nil: break + case errTimedOut: + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", + bc.broker.ID(), child.topic, child.partition) + delete(bc.subscriptions, child) case ErrOffsetOutOfRange: // there's no point in retrying this it will just fail the same way again // shut it down and force the user to choose what to do - child.sendError(child.responseResult) - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult) + child.sendError(result) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) close(child.trigger) delete(bc.subscriptions, child) case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: // not an error, but does need redispatching Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.responseResult) + bc.broker.ID(), child.topic, child.partition, result) child.trigger <- none{} delete(bc.subscriptions, child) default: // dunno, tell the user and try redispatching - child.sendError(child.responseResult) + child.sendError(result) Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.responseResult) + bc.broker.ID(), child.topic, child.partition, result) child.trigger <- none{} delete(bc.subscriptions, child) } - - child.responseResult = nil } } } diff --git a/consumer_test.go b/consumer_test.go index cad709b53..6617f149e 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) { fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) leader.Returns(fetchResponse) + leader.Returns(fetchResponse) safeClose(t, c1) safeClose(t, c0)