From f552ad9c8a708973b74385bd3ec7a49267e25168 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 8 Dec 2023 17:54:17 +0800 Subject: [PATCH] plugin: fix bug that watch loop will refresh frequently when channel closed (#49275) (#49290) close pingcap/tidb#49273 --- pkg/plugin/BUILD.bazel | 3 ++- pkg/plugin/plugin.go | 28 +++++++++++++++++++++---- pkg/plugin/plugin_test.go | 43 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/pkg/plugin/BUILD.bazel b/pkg/plugin/BUILD.bazel index 0638939238dff..60157479817b1 100644 --- a/pkg/plugin/BUILD.bazel +++ b/pkg/plugin/BUILD.bazel @@ -39,7 +39,7 @@ go_test( ], embed = [":plugin"], flaky = True, - shard_count = 11, + shard_count = 12, deps = [ "//pkg/kv", "//pkg/parser/mysql", @@ -49,6 +49,7 @@ go_test( "//pkg/testkit", "//pkg/testkit/testsetup", "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_client_v3//:client", "@org_uber_go_goleak//:goleak", ], ) diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 2d5c57ab62687..cf5401bbb8263 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -21,6 +21,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -275,14 +276,33 @@ func (w *flushWatcher) refreshPluginState() error { } return nil } - func (w *flushWatcher) watchLoop() { - watchChan := w.etcd.Watch(w.ctx, w.path) + const reWatchInterval = time.Second * 5 + logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name)) + for w.ctx.Err() == nil { + ch := w.etcd.Watch(w.ctx, w.path) + if exit := w.watchLoopWithChan(ch); exit { + break + } + + logutil.BgLogger().Info( + "plugin flushWatcher old chan closed, restart loop later", + zap.String("plugin", w.manifest.Name), + zap.Duration("after", reWatchInterval)) + time.Sleep(reWatchInterval) + } +} + +func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) { for { select { case <-w.ctx.Done(): - return - case <-watchChan: + return true + case _, ok := <-ch: + if !ok { + return false + } + logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name)) _ = w.refreshPluginState() } } diff --git a/pkg/plugin/plugin_test.go b/pkg/plugin/plugin_test.go index bc09aa7da1469..daa1965b7758f 100644 --- a/pkg/plugin/plugin_test.go +++ b/pkg/plugin/plugin_test.go @@ -18,11 +18,14 @@ import ( "context" "io" "strconv" + "sync/atomic" "testing" + "time" "unsafe" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestLoadStaticRegisteredPlugin(t *testing.T) { @@ -316,3 +319,43 @@ func TestPluginsClone(t *testing.T) { require.Equal(t, uint16(1), cps.versions["whitelist"]) require.Len(t, cps.dyingPlugins, 1) } + +func TestPluginWatcherLoop(t *testing.T) { + // exit when ctx done + ctx, cancel := context.WithCancel(context.Background()) + watcher := &flushWatcher{ + ctx: ctx, + manifest: &Manifest{ + Name: "test", + }, + } + ch := make(chan clientv3.WatchResponse) + var cancelled atomic.Bool + go func() { + time.Sleep(10 * time.Millisecond) + cancelled.Store(true) + cancel() + }() + exit := watcher.watchLoopWithChan(ch) + require.True(t, exit) + require.True(t, cancelled.Load()) + + // exit when ch closed + watcher = &flushWatcher{ + ctx: context.Background(), + manifest: &Manifest{ + Name: "test", + }, + } + + var closed atomic.Bool + ch = make(chan clientv3.WatchResponse) + go func() { + time.Sleep(10 * time.Millisecond) + closed.Store(true) + close(ch) + }() + exit = watcher.watchLoopWithChan(ch) + require.False(t, exit) + require.True(t, cancelled.Load()) +}