From 94be9d4dbed9a0c5ac573b88873ab1213d5c6724 Mon Sep 17 00:00:00 2001 From: xhbpiao Date: Thu, 16 Nov 2017 20:50:09 +0800 Subject: [PATCH] add block.LastStableOffset check ,only block.LastStableOffset >0 meaningful --- consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index cb6f031d0..6d16dc779 100644 --- a/consumer.go +++ b/consumer.go @@ -547,7 +547,7 @@ func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*Cons incomplete = true } - if child.offset > block.LastStableOffset { + if block.LastStableOffset > 0 && child.offset > block.LastStableOffset { // We reached the end of closed transactions break }