diff --git a/consumer.go b/consumer.go index 203e9c32a..3390e5e29 100644 --- a/consumer.go +++ b/consumer.go @@ -69,17 +69,17 @@ type PartitionConsumerConfig struct { OffsetMethod OffsetMethod // Interpreted differently according to the value of OffsetMethod. OffsetValue int64 - // The number of events to buffer in the Events channel. Having this non-zero permits the + // The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the // consumer to continue fetching messages in the background while client code consumes events, // greatly improving throughput. The default is 64. - EventBufferSize int + ChannelBufferSize int } // NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults. func NewPartitionConsumerConfig() *PartitionConsumerConfig { return &PartitionConsumerConfig{ - DefaultFetchSize: 32768, - EventBufferSize: 64, + DefaultFetchSize: 32768, + ChannelBufferSize: 64, } } @@ -94,30 +94,40 @@ func (config *PartitionConsumerConfig) Validate() error { return ConfigurationError("Invalid MaxMessageSize") } - if config.EventBufferSize < 0 { - return ConfigurationError("Invalid EventBufferSize") + if config.ChannelBufferSize < 0 { + return ConfigurationError("Invalid ChannelBufferSize") } return nil } -// ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or -// a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set. -type ConsumerEvent struct { +// ConsumerMessage encapsulates a Kafka message returned by the consumer. +type ConsumerMessage struct { Key, Value []byte Topic string Partition int32 Offset int64 - Err error } -// ConsumeErrors is a type that wraps a batch of "ConsumerEvent"s and implements the Error interface. +// ConsumerError is what is provided to the user when an error occurs. +// It wraps an error and includes the topic and partition. +type ConsumerError struct { + Topic string + Partition int32 + Err error +} + +func (ce ConsumerError) Error() string { + return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err) +} + +// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. // It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors // when stopping. -type ConsumeErrors []*ConsumerEvent +type ConsumerErrors []*ConsumerError -func (ce ConsumeErrors) Error() string { - return fmt.Sprintf("kafka: %d errors when consuming", len(ce)) +func (ce ConsumerErrors) Error() string { + return fmt.Sprintf("kafka: %d errors while consuming", len(ce)) } // Consumer manages PartitionConsumers which process Kafka messages from brokers. @@ -171,7 +181,8 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, config *Parti config: *config, topic: topic, partition: partition, - events: make(chan *ConsumerEvent, config.EventBufferSize), + messages: make(chan *ConsumerMessage, config.ChannelBufferSize), + errors: make(chan *ConsumerError, config.ChannelBufferSize), trigger: make(chan none, 1), dying: make(chan none), fetchSize: config.DefaultFetchSize, @@ -267,6 +278,7 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) { // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close() // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of // scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary). +// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually. type PartitionConsumer struct { consumer *Consumer config PartitionConsumerConfig @@ -274,7 +286,8 @@ type PartitionConsumer struct { partition int32 broker *Broker - events chan *ConsumerEvent + messages chan *ConsumerMessage + errors chan *ConsumerError trigger, dying chan none fetchSize int32 @@ -282,7 +295,7 @@ type PartitionConsumer struct { } func (child *PartitionConsumer) sendError(err error) { - child.events <- &ConsumerEvent{ + child.errors <- &ConsumerError{ Topic: child.topic, Partition: child.partition, Err: err, @@ -318,7 +331,8 @@ func (child *PartitionConsumer) dispatcher() { child.consumer.unrefBrokerConsumer(child.broker) } child.consumer.removeChild(child) - close(child.events) + close(child.messages) + close(child.errors) } func (child *PartitionConsumer) dispatch() error { @@ -361,26 +375,48 @@ func (child *PartitionConsumer) chooseStartingOffset() (err error) { return err } -// Events returns the read channel for any events (messages or errors) that might be returned by the broker. -func (child *PartitionConsumer) Events() <-chan *ConsumerEvent { - return child.events +// Messages returns the read channel for the messages that are returned by the broker +func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage { + return child.messages } -// Close stops the PartitionConsumer from fetching messages. It is required to call this function before a -// consumer object passes out of scope, as it will otherwise leak memory. You must call this before -// calling Close on the underlying client. -func (child *PartitionConsumer) Close() error { +// Errors returns the read channel for any errors that occurred while consuming the partition. +// You have to read this channel to prevent the consumer from deadlock. Under no circumstances, +// the partition consumer will shut down by itself. It will just wait until it is able to continue +// consuming messages. If you want to shut down your consumer, you will have trigger it yourself +// by consuming this channel and calling Close or AsyncClose when appropriate. +func (child *PartitionConsumer) Errors() <-chan *ConsumerError { + return child.errors +} + +// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, +// after which you should wait until the 'messages' and 'errors' channel are drained. +// It is required to call this function, or Close before a consumer object passes out of scope, +// as it will otherwise leak memory. You must call this before calling Close on the underlying +// client. +func (child *PartitionConsumer) AsyncClose() { // this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes - // the dispatcher to exit its loop, which removes it from the consumer then closes its 'events' channel - // (alternatively, if the child is already at the dispatcher for some reason, that will also just - // close itself) + // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and + // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will + // also just close itself) close(child.dying) +} - var errors ConsumeErrors - for event := range child.events { - if event.Err != nil { - errors = append(errors, event) +// Close stops the PartitionConsumer from fetching messages. It is required to call this function, +// or AsyncCose before a consumer object passes out of scope, as it will otherwise leak memory. You must +// call this before calling Close on the underlying client. +func (child *PartitionConsumer) Close() error { + child.AsyncClose() + + go withRecover(func() { + for _ = range child.messages { + // drain } + }) + + var errors ConsumerErrors + for err := range child.errors { + errors = append(errors, err) } if len(errors) > 0 { @@ -572,7 +608,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe if msg.Offset >= child.offset { atLeastOne = true - child.events <- &ConsumerEvent{ + child.messages <- &ConsumerMessage{ Topic: child.topic, Partition: child.partition, Key: msg.Msg.Key, diff --git a/consumer_test.go b/consumer_test.go index 08940aeca..95d16ac49 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -57,12 +57,13 @@ func TestConsumerOffsetManual(t *testing.T) { seedBroker.Close() for i := 0; i < 10; i++ { - event := <-consumer.Events() - if event.Err != nil { - t.Error(event.Err) - } - if event.Offset != int64(i+1234) { - t.Error("Incorrect message offset!") + select { + case message := <-consumer.Messages(): + if message.Offset != int64(i+1234) { + t.Error("Incorrect message offset!") + } + case err := <-consumer.Errors(): + t.Error(err) } } @@ -152,11 +153,8 @@ func TestConsumerFunnyOffsets(t *testing.T) { config.OffsetValue = 2 consumer, err := master.ConsumePartition("my_topic", 0, config) - event := <-consumer.Events() - if event.Err != nil { - t.Error(event.Err) - } - if event.Offset != 3 { + message := <-consumer.Messages() + if message.Offset != 3 { t.Error("Incorrect message offset!") } @@ -201,17 +199,21 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { if err != nil { t.Error(err) } + + go func(c *PartitionConsumer) { + for err := range c.Errors() { + t.Error(err) + } + }(consumer) + wg.Add(1) go func(partition int32, c *PartitionConsumer) { for i := 0; i < 10; i++ { - event := <-consumer.Events() - if event.Err != nil { - t.Error(event.Err, i, partition) - } - if event.Offset != int64(i) { - t.Error("Incorrect message offset!", i, partition, event.Offset) + message := <-consumer.Messages() + if message.Offset != int64(i) { + t.Error("Incorrect message offset!", i, partition, message.Offset) } - if event.Partition != partition { + if message.Partition != partition { t.Error("Incorrect message partition!") } } @@ -292,7 +294,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { safeClose(t, client) } -func ExampleConsumer() { +func ExampleConsumerWithSelect() { client, err := NewClient("my_client", []string{"localhost:9092"}, nil) if err != nil { panic(err) @@ -321,10 +323,9 @@ func ExampleConsumer() { consumerLoop: for { select { - case event := <-consumer.Events(): - if event.Err != nil { - panic(event.Err) - } + case err := <-consumer.Errors(): + panic(err) + case <-consumer.Messages(): msgCount++ case <-time.After(5 * time.Second): fmt.Println("> timed out") @@ -333,3 +334,53 @@ consumerLoop: } fmt.Println("Got", msgCount, "messages.") } + +func ExampleConsumerWithGoroutines() { + client, err := NewClient("my_client", []string{"localhost:9092"}, nil) + if err != nil { + panic(err) + } else { + fmt.Println("> connected") + } + defer client.Close() + + master, err := NewConsumer(client, nil) + if err != nil { + panic(err) + } else { + fmt.Println("> master consumer ready") + } + + consumer, err := master.ConsumePartition("my_topic", 0, nil) + if err != nil { + panic(err) + } else { + fmt.Println("> consumer ready") + } + defer consumer.Close() + + var ( + wg sync.WaitGroup + msgCount int + ) + + wg.Add(1) + go func() { + defer wg.Done() + for message := range consumer.Messages() { + fmt.Printf("Consumed message with offset %d", message.Offset) + msgCount++ + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for err := range consumer.Errors() { + fmt.Println(err) + } + }() + + wg.Wait() + fmt.Println("Got", msgCount, "messages.") +} diff --git a/functional_test.go b/functional_test.go index c38f55fd9..42f473cce 100644 --- a/functional_test.go +++ b/functional_test.go @@ -171,15 +171,17 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) { } safeClose(t, producer) - events := consumer.Events() for i := 1; i <= TestBatchSize; i++ { select { case <-time.After(10 * time.Second): t.Fatal("Not received any more events in the last 10 seconds.") - case event := <-events: - if string(event.Value) != fmt.Sprintf("testing %d", i) { - t.Fatalf("Unexpected message with index %d: %s", i, event.Value) + case err := <-consumer.Errors(): + t.Error(err) + + case message := <-consumer.Messages(): + if string(message.Value) != fmt.Sprintf("testing %d", i) { + t.Fatalf("Unexpected message with index %d: %s", i, message.Value) } }