From f66f2deeff746250d8f89d06fc005783a0e6a5f2 Mon Sep 17 00:00:00 2001 From: Mathieu Payeur Levallois Date: Thu, 20 Oct 2016 03:50:02 -0400 Subject: [PATCH 1/3] Add Consumer.HighWaterMarks() --- consumer.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/consumer.go b/consumer.go index 5271e21de..af93e8a81 100644 --- a/consumer.go +++ b/consumer.go @@ -63,6 +63,9 @@ type Consumer interface { // or OffsetOldest ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) + // HighWaterMarks returns the current high water marks for each topic and partitions + HighWaterMarks() map[string]map[int32]int64 + // Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. Close() error @@ -163,6 +166,22 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) return child, nil } +func (c *consumer) HighWaterMarks() map[string]map[int32]int64 { + c.lock.Lock() + defer c.lock.Unlock() + + hwms := make(map[string]map[int32]int64) + for topic, p := range c.children { + hwm := make(map[int32]int64, len(p)) + for partition, pc := range p { + hwm[partition] = pc.HighWaterMarkOffset() + } + hwms[topic] = hwm + } + + return hwms +} + func (c *consumer) addChild(child *partitionConsumer) error { c.lock.Lock() defer c.lock.Unlock() From faf28b8ed960869124e7c00373241d25ce3fd500 Mon Sep 17 00:00:00 2001 From: Mathieu Payeur Levallois Date: Thu, 20 Oct 2016 23:32:48 -0400 Subject: [PATCH 2/3] fix mocks --- mocks/consumer.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mocks/consumer.go b/mocks/consumer.go index 09657c0ee..a524190a0 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -96,6 +96,22 @@ func (c *Consumer) Partitions(topic string) ([]int32, error) { return c.metadata[topic], nil } +func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { + c.l.Lock() + defer c.l.Unlock() + + hwms := make(map[string]map[int32]int64, len(c.partitionConsumers)) + for topic, partitionConsumers := range c.partitionConsumers { + hwm := make(map[int32]int64, len(partitionConsumers)) + for partition, pc := range partitionConsumers { + hwm[partition] = pc.HighWaterMarkOffset() + } + hwms[topic] = hwm + } + + return hwms +} + // Close implements the Close method from the sarama.Consumer interface. It will close // all registered PartitionConsumer instances. func (c *Consumer) Close() error { From b0e729b8e95a2ad143570643cf3e9e1bf78a8ff5 Mon Sep 17 00:00:00 2001 From: Mathieu Payeur Levallois Date: Mon, 24 Oct 2016 14:48:06 -0400 Subject: [PATCH 3/3] Fix HighWaterMarks() comment --- consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index af93e8a81..696b8428f 100644 --- a/consumer.go +++ b/consumer.go @@ -63,7 +63,8 @@ type Consumer interface { // or OffsetOldest ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) - // HighWaterMarks returns the current high water marks for each topic and partitions + // HighWaterMarks returns the current high water marks for each topic and partition + // Consistency between partitions is not garanteed since high water marks are updated separately. HighWaterMarks() map[string]map[int32]int64 // Close shuts down the consumer. It must be called after all child