diff --git a/consumer.go b/consumer.go index 5271e21de..696b8428f 100644 --- a/consumer.go +++ b/consumer.go @@ -63,6 +63,10 @@ type Consumer interface { // or OffsetOldest ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) + // HighWaterMarks returns the current high water marks for each topic and partition + // Consistency between partitions is not garanteed since high water marks are updated separately. + HighWaterMarks() map[string]map[int32]int64 + // Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. Close() error @@ -163,6 +167,22 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) return child, nil } +func (c *consumer) HighWaterMarks() map[string]map[int32]int64 { + c.lock.Lock() + defer c.lock.Unlock() + + hwms := make(map[string]map[int32]int64) + for topic, p := range c.children { + hwm := make(map[int32]int64, len(p)) + for partition, pc := range p { + hwm[partition] = pc.HighWaterMarkOffset() + } + hwms[topic] = hwm + } + + return hwms +} + func (c *consumer) addChild(child *partitionConsumer) error { c.lock.Lock() defer c.lock.Unlock() diff --git a/mocks/consumer.go b/mocks/consumer.go index 09657c0ee..a524190a0 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -96,6 +96,22 @@ func (c *Consumer) Partitions(topic string) ([]int32, error) { return c.metadata[topic], nil } +func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { + c.l.Lock() + defer c.l.Unlock() + + hwms := make(map[string]map[int32]int64, len(c.partitionConsumers)) + for topic, partitionConsumers := range c.partitionConsumers { + hwm := make(map[int32]int64, len(partitionConsumers)) + for partition, pc := range partitionConsumers { + hwm[partition] = pc.HighWaterMarkOffset() + } + hwms[topic] = hwm + } + + return hwms +} + // Close implements the Close method from the sarama.Consumer interface. It will close // all registered PartitionConsumer instances. func (c *Consumer) Close() error {