From 60cfbfdc82604d9e1259c5eadef1d64a95b40418 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Tue, 10 Sep 2024 11:43:54 +0800 Subject: [PATCH] [bug] logtail: clear the tables in global stats when logtail re-connect --- pkg/vm/engine/disttae/db.go | 3 +++ pkg/vm/engine/disttae/stats.go | 9 +++++++++ pkg/vm/engine/disttae/stats_test.go | 13 +++++++++++++ 3 files changed, 25 insertions(+) diff --git a/pkg/vm/engine/disttae/db.go b/pkg/vm/engine/disttae/db.go index 1852f059f3409..a2dd4ecd3b307 100644 --- a/pkg/vm/engine/disttae/db.go +++ b/pkg/vm/engine/disttae/db.go @@ -257,6 +257,9 @@ func (e *Engine) init(ctx context.Context) error { bat.Clean(m) } + // clear all tables in global stats. + e.globalStats.clearTables() + return nil } diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 72c2da279d3d4..022bf97cd3ac2 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -266,6 +266,15 @@ func (gs *GlobalStats) RemoveTid(tid uint64) { delete(gs.logtailUpdate.mu.updated, tid) } +// clearTables clears the tables in the map if there are any tables in it. +func (gs *GlobalStats) clearTables() { + gs.logtailUpdate.mu.Lock() + defer gs.logtailUpdate.mu.Unlock() + if len(gs.logtailUpdate.mu.updated) > 0 { + gs.logtailUpdate.mu.updated = make(map[uint64]struct{}) + } +} + func (gs *GlobalStats) enqueue(tail *logtail.TableLogtail) { select { case gs.tailC <- tail: diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index dc7262bc59b30..e98cf4a40390b 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -74,3 +74,16 @@ func TestGlobalStats_ShouldUpdate(t *testing.T) { assert.Equal(t, 1, int(count.Load())) }) } + +func TestGlobalStats_ClearTables(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gs := NewGlobalStats(ctx, nil, nil) + for i := 0; i < 10; i++ { + gs.notifyLogtailUpdate(uint64(2000 + i)) + } + assert.Equal(t, 10, len(gs.logtailUpdate.mu.updated)) + gs.clearTables() + assert.Equal(t, 0, len(gs.logtailUpdate.mu.updated)) +}