Skip to content

Commit

Permalink
Merge pull request #303 from Shopify/split_consumer_events_channel
Browse files Browse the repository at this point in the history
Split consumer’s Events() channel into Messages() and Errors()
  • Loading branch information
Willem van Bergen committed Feb 24, 2015
2 parents 1b6daea + ed24dc9 commit cc01cb4
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 60 deletions.
102 changes: 69 additions & 33 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ type PartitionConsumerConfig struct {
OffsetMethod OffsetMethod
// Interpreted differently according to the value of OffsetMethod.
OffsetValue int64
// The number of events to buffer in the Events channel. Having this non-zero permits the
// The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the
// consumer to continue fetching messages in the background while client code consumes events,
// greatly improving throughput. The default is 64.
EventBufferSize int
ChannelBufferSize int
}

// NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
func NewPartitionConsumerConfig() *PartitionConsumerConfig {
return &PartitionConsumerConfig{
DefaultFetchSize: 32768,
EventBufferSize: 64,
DefaultFetchSize: 32768,
ChannelBufferSize: 64,
}
}

Expand All @@ -94,30 +94,40 @@ func (config *PartitionConsumerConfig) Validate() error {
return ConfigurationError("Invalid MaxMessageSize")
}

if config.EventBufferSize < 0 {
return ConfigurationError("Invalid EventBufferSize")
if config.ChannelBufferSize < 0 {
return ConfigurationError("Invalid ChannelBufferSize")
}

return nil
}

// ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
// a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
type ConsumerEvent struct {
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Err error
}

// ConsumeErrors is a type that wraps a batch of "ConsumerEvent"s and implements the Error interface.
// ConsumerError is what is provided to the user when an error occurs.
// It wraps an error and includes the topic and partition.
type ConsumerError struct {
Topic string
Partition int32
Err error
}

func (ce ConsumerError) Error() string {
return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
}

// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
// when stopping.
type ConsumeErrors []*ConsumerEvent
type ConsumerErrors []*ConsumerError

func (ce ConsumeErrors) Error() string {
return fmt.Sprintf("kafka: %d errors when consuming", len(ce))
func (ce ConsumerErrors) Error() string {
return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
}

// Consumer manages PartitionConsumers which process Kafka messages from brokers.
Expand Down Expand Up @@ -171,7 +181,8 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, config *Parti
config: *config,
topic: topic,
partition: partition,
events: make(chan *ConsumerEvent, config.EventBufferSize),
messages: make(chan *ConsumerMessage, config.ChannelBufferSize),
errors: make(chan *ConsumerError, config.ChannelBufferSize),
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: config.DefaultFetchSize,
Expand Down Expand Up @@ -267,22 +278,24 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
type PartitionConsumer struct {
consumer *Consumer
config PartitionConsumerConfig
topic string
partition int32

broker *Broker
events chan *ConsumerEvent
messages chan *ConsumerMessage
errors chan *ConsumerError
trigger, dying chan none

fetchSize int32
offset int64
}

func (child *PartitionConsumer) sendError(err error) {
child.events <- &ConsumerEvent{
child.errors <- &ConsumerError{
Topic: child.topic,
Partition: child.partition,
Err: err,
Expand Down Expand Up @@ -318,7 +331,8 @@ func (child *PartitionConsumer) dispatcher() {
child.consumer.unrefBrokerConsumer(child.broker)
}
child.consumer.removeChild(child)
close(child.events)
close(child.messages)
close(child.errors)
}

func (child *PartitionConsumer) dispatch() error {
Expand Down Expand Up @@ -361,26 +375,48 @@ func (child *PartitionConsumer) chooseStartingOffset() (err error) {
return err
}

// Events returns the read channel for any events (messages or errors) that might be returned by the broker.
func (child *PartitionConsumer) Events() <-chan *ConsumerEvent {
return child.events
// Messages returns the read channel for the messages that are returned by the broker
func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage {
return child.messages
}

// Close stops the PartitionConsumer from fetching messages. It is required to call this function before a
// consumer object passes out of scope, as it will otherwise leak memory. You must call this before
// calling Close on the underlying client.
func (child *PartitionConsumer) Close() error {
// Errors returns the read channel for any errors that occurred while consuming the partition.
// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
// the partition consumer will shut down by itself. It will just wait until it is able to continue
// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
// by consuming this channel and calling Close or AsyncClose when appropriate.
func (child *PartitionConsumer) Errors() <-chan *ConsumerError {
return child.errors
}

// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
// after which you should wait until the 'messages' and 'errors' channel are drained.
// It is required to call this function, or Close before a consumer object passes out of scope,
// as it will otherwise leak memory. You must call this before calling Close on the underlying
// client.
func (child *PartitionConsumer) AsyncClose() {
// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'events' channel
// (alternatively, if the child is already at the dispatcher for some reason, that will also just
// close itself)
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
// also just close itself)
close(child.dying)
}

var errors ConsumeErrors
for event := range child.events {
if event.Err != nil {
errors = append(errors, event)
// Close stops the PartitionConsumer from fetching messages. It is required to call this function,
// or AsyncCose before a consumer object passes out of scope, as it will otherwise leak memory. You must
// call this before calling Close on the underlying client.
func (child *PartitionConsumer) Close() error {
child.AsyncClose()

go withRecover(func() {
for _ = range child.messages {
// drain
}
})

var errors ConsumerErrors
for err := range child.errors {
errors = append(errors, err)
}

if len(errors) > 0 {
Expand Down Expand Up @@ -572,7 +608,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe

if msg.Offset >= child.offset {
atLeastOne = true
child.events <- &ConsumerEvent{
child.messages <- &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Expand Down
97 changes: 74 additions & 23 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ func TestConsumerOffsetManual(t *testing.T) {
seedBroker.Close()

for i := 0; i < 10; i++ {
event := <-consumer.Events()
if event.Err != nil {
t.Error(event.Err)
}
if event.Offset != int64(i+1234) {
t.Error("Incorrect message offset!")
select {
case message := <-consumer.Messages():
if message.Offset != int64(i+1234) {
t.Error("Incorrect message offset!")
}
case err := <-consumer.Errors():
t.Error(err)
}
}

Expand Down Expand Up @@ -152,11 +153,8 @@ func TestConsumerFunnyOffsets(t *testing.T) {
config.OffsetValue = 2
consumer, err := master.ConsumePartition("my_topic", 0, config)

event := <-consumer.Events()
if event.Err != nil {
t.Error(event.Err)
}
if event.Offset != 3 {
message := <-consumer.Messages()
if message.Offset != 3 {
t.Error("Incorrect message offset!")
}

Expand Down Expand Up @@ -201,17 +199,21 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
if err != nil {
t.Error(err)
}

go func(c *PartitionConsumer) {
for err := range c.Errors() {
t.Error(err)
}
}(consumer)

wg.Add(1)
go func(partition int32, c *PartitionConsumer) {
for i := 0; i < 10; i++ {
event := <-consumer.Events()
if event.Err != nil {
t.Error(event.Err, i, partition)
}
if event.Offset != int64(i) {
t.Error("Incorrect message offset!", i, partition, event.Offset)
message := <-consumer.Messages()
if message.Offset != int64(i) {
t.Error("Incorrect message offset!", i, partition, message.Offset)
}
if event.Partition != partition {
if message.Partition != partition {
t.Error("Incorrect message partition!")
}
}
Expand Down Expand Up @@ -292,7 +294,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
safeClose(t, client)
}

func ExampleConsumer() {
func ExampleConsumerWithSelect() {
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
if err != nil {
panic(err)
Expand Down Expand Up @@ -321,10 +323,9 @@ func ExampleConsumer() {
consumerLoop:
for {
select {
case event := <-consumer.Events():
if event.Err != nil {
panic(event.Err)
}
case err := <-consumer.Errors():
panic(err)
case <-consumer.Messages():
msgCount++
case <-time.After(5 * time.Second):
fmt.Println("> timed out")
Expand All @@ -333,3 +334,53 @@ consumerLoop:
}
fmt.Println("Got", msgCount, "messages.")
}

func ExampleConsumerWithGoroutines() {
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
if err != nil {
panic(err)
} else {
fmt.Println("> connected")
}
defer client.Close()

master, err := NewConsumer(client, nil)
if err != nil {
panic(err)
} else {
fmt.Println("> master consumer ready")
}

consumer, err := master.ConsumePartition("my_topic", 0, nil)
if err != nil {
panic(err)
} else {
fmt.Println("> consumer ready")
}
defer consumer.Close()

var (
wg sync.WaitGroup
msgCount int
)

wg.Add(1)
go func() {
defer wg.Done()
for message := range consumer.Messages() {
fmt.Printf("Consumed message with offset %d", message.Offset)
msgCount++
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for err := range consumer.Errors() {
fmt.Println(err)
}
}()

wg.Wait()
fmt.Println("Got", msgCount, "messages.")
}
10 changes: 6 additions & 4 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,17 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
}
safeClose(t, producer)

events := consumer.Events()
for i := 1; i <= TestBatchSize; i++ {
select {
case <-time.After(10 * time.Second):
t.Fatal("Not received any more events in the last 10 seconds.")

case event := <-events:
if string(event.Value) != fmt.Sprintf("testing %d", i) {
t.Fatalf("Unexpected message with index %d: %s", i, event.Value)
case err := <-consumer.Errors():
t.Error(err)

case message := <-consumer.Messages():
if string(message.Value) != fmt.Sprintf("testing %d", i) {
t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
}
}

Expand Down

0 comments on commit cc01cb4

Please sign in to comment.