From 2640f31ca721cd82b237650a5e946fe10cf65ce3 Mon Sep 17 00:00:00 2001 From: crimson <1291463831@qq.com> Date: Mon, 22 Apr 2024 20:29:48 +0800 Subject: [PATCH] allow process when no data is fetched --- consumer/tasks.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/consumer/tasks.go b/consumer/tasks.go index 1df4fd66..96e004cb 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -6,6 +6,7 @@ import ( "runtime" "time" + sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log/level" ) @@ -93,8 +94,12 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint s err = fmt.Errorf("get a panic when process: %v", r) } }() + groupList := consumer.lastFetchLogGroupList + if groupList == nil { + groupList = &sls.LogGroupList{} + } if consumer.lastFetchLogGroupList != nil { - rollBackCheckpoint, err = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + rollBackCheckpoint, err = consumer.processor.Process(consumer.shardId, groupList, consumer.consumerCheckPointTracker) consumer.saveCheckPointIfNeeded() if err != nil { return