From d73c6db06dfc93f7c61b648c136ecc2a22a69e1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Mon, 25 Jul 2022 12:07:10 +0800 Subject: [PATCH] log-backup: remove the checkpoint after removal (#36452) close pingcap/tidb#36423 --- br/pkg/streamhelper/advancer.go | 10 ++++--- ...{stream_listener.go => advancer_cliext.go} | 25 +++++++++++++++--- br/pkg/streamhelper/advancer_env.go | 8 +++--- br/pkg/streamhelper/basic_lib_for_test.go | 8 ++++++ br/pkg/streamhelper/client.go | 11 -------- br/pkg/streamhelper/integration_test.go | 26 +++++++++++++++++-- 6 files changed, 65 insertions(+), 23 deletions(-) rename br/pkg/streamhelper/{stream_listener.go => advancer_cliext.go} (81%) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 1dc911b5f81bf..a74ee9b623c26 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -333,7 +333,7 @@ func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskE return nil } log.Info("meet task event", zap.Stringer("event", &e)) - if err := c.onTaskEvent(e); err != nil { + if err := c.onTaskEvent(ctx, e); err != nil { if errors.Cause(e.Err) != context.Canceled { log.Error("listen task meet error, would reopen.", logutil.ShortError(err)) return err @@ -391,7 +391,7 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { return } log.Info("meet task event", zap.Stringer("event", &e)) - if err := c.onTaskEvent(e); err != nil { + if err := c.onTaskEvent(ctx, e); err != nil { if errors.Cause(e.Err) != context.Canceled { log.Error("listen task meet error, would reopen.", logutil.ShortError(err)) time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) }) @@ -403,7 +403,7 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { }() } -func (c *CheckpointAdvancer) onTaskEvent(e TaskEvent) error { +func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error { c.taskMu.Lock() defer c.taskMu.Unlock() switch e.Type { @@ -412,6 +412,10 @@ func (c *CheckpointAdvancer) onTaskEvent(e TaskEvent) error { case EventDel: c.task = nil c.state = &fullScan{} + if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { + log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) + } + metrics.LastCheckpoint.DeleteLabelValues(e.Name) c.cache.Clear() case EventErr: return e.Err diff --git a/br/pkg/streamhelper/stream_listener.go b/br/pkg/streamhelper/advancer_cliext.go similarity index 81% rename from br/pkg/streamhelper/stream_listener.go rename to br/pkg/streamhelper/advancer_cliext.go index e48064613efdb..b9a6308327199 100644 --- a/br/pkg/streamhelper/stream_listener.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -49,7 +49,7 @@ func (t *TaskEvent) String() string { return fmt.Sprintf("%s(%s)", t.Type, t.Name) } -type TaskEventClient struct { +type AdvancerExt struct { MetaDataClient } @@ -94,7 +94,7 @@ func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) { return result, nil } -func (t TaskEventClient) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) { +func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) { c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev)) handleResponse := func(resp clientv3.WatchResponse) bool { events, err := eventFromWatch(resp) @@ -139,7 +139,7 @@ func (t TaskEventClient) startListen(ctx context.Context, rev int64, ch chan<- T }() } -func (t TaskEventClient) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int64, error) { +func (t AdvancerExt) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int64, error) { tasks, rev, err := t.GetAllTasksWithRevision(ctx) if err != nil { return nil, 0, err @@ -156,7 +156,7 @@ func (t TaskEventClient) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, return events, rev, nil } -func (t TaskEventClient) Begin(ctx context.Context, ch chan<- TaskEvent) error { +func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error { initialTasks, rev, err := t.getFullTasksAsEvent(ctx) if err != nil { return err @@ -168,3 +168,20 @@ func (t TaskEventClient) Begin(ctx context.Context, ch chan<- TaskEvent) error { t.startListen(ctx, rev+1, ch) return nil } + +func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error { + key := GlobalCheckpointOf(taskName) + value := string(encodeUint64(checkpoint)) + _, err := t.KV.Put(ctx, key, value) + + if err != nil { + return err + } + return nil +} + +func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error { + key := GlobalCheckpointOf(taskName) + _, err := t.KV.Delete(ctx, key) + return err +} diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 21c61ff129ce2..181d8933449d4 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -51,7 +51,7 @@ func (c PDRegionScanner) RegionScan(ctx context.Context, key []byte, endKey []by // clusterEnv is the environment for running in the real cluster. type clusterEnv struct { clis *utils.StoreManager - *TaskEventClient + *AdvancerExt PDRegionScanner } @@ -71,7 +71,7 @@ func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (log func CliEnv(cli *utils.StoreManager, etcdCli *clientv3.Client) Env { return clusterEnv{ clis: cli, - TaskEventClient: &TaskEventClient{MetaDataClient: *NewMetaDataClient(etcdCli)}, + AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)}, PDRegionScanner: PDRegionScanner{cli.PDClient()}, } } @@ -87,7 +87,7 @@ func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (En Time: time.Duration(conf.TiKVClient.GrpcKeepAliveTime) * time.Second, Timeout: time.Duration(conf.TiKVClient.GrpcKeepAliveTimeout) * time.Second, }, tconf), - TaskEventClient: &TaskEventClient{MetaDataClient: *NewMetaDataClient(etcdCli)}, + AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)}, PDRegionScanner: PDRegionScanner{Client: pdCli}, }, nil } @@ -104,4 +104,6 @@ type StreamMeta interface { Begin(ctx context.Context, ch chan<- TaskEvent) error // UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store. UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error + // ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store. + ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 14d777f1d24e7..7877077a03312 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -424,6 +424,14 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, return nil } +func (t *testEnv) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error { + t.mu.Lock() + defer t.mu.Unlock() + + t.checkpoint = 0 + return nil +} + func (t *testEnv) getCheckpoint() uint64 { t.mu.Lock() defer t.mu.Unlock() diff --git a/br/pkg/streamhelper/client.go b/br/pkg/streamhelper/client.go index 56a87c20d4fd8..2e27bf97a399e 100644 --- a/br/pkg/streamhelper/client.go +++ b/br/pkg/streamhelper/client.go @@ -195,17 +195,6 @@ func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName stri return nil } -func (c *MetaDataClient) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error { - key := GlobalCheckpointOf(taskName) - value := string(encodeUint64(checkpoint)) - _, err := c.KV.Put(ctx, key, value) - - if err != nil { - return err - } - return nil -} - // GetTask get the basic task handle from the metadata storage. func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error) { resp, err := c.Get(ctx, TaskOf(taskName)) diff --git a/br/pkg/streamhelper/integration_test.go b/br/pkg/streamhelper/integration_test.go index b8817061de79d..d60f53ac71bd0 100644 --- a/br/pkg/streamhelper/integration_test.go +++ b/br/pkg/streamhelper/integration_test.go @@ -139,7 +139,8 @@ func TestIntegration(t *testing.T) { t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) }) t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) }) t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli, etcd) }) - t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.TaskEventClient{MetaDataClient: metaCli}) }) + t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) + t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) } func TestChecking(t *testing.T) { @@ -265,7 +266,7 @@ func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient, require.Equal(t, uint64(10002), ts) } -func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) { +func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) { ctx, cancel := context.WithCancel(context.Background()) taskName := "simple" taskInfo := simpleTask(taskName, 4) @@ -295,3 +296,24 @@ func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) { _, ok := <-ch require.False(t, ok) } + +func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + task := "simple" + req := require.New(t) + getCheckpoint := func() uint64 { + resp, err := metaCli.KV.Get(ctx, streamhelper.GlobalCheckpointOf(task)) + req.NoError(err) + if len(resp.Kvs) == 0 { + return 0 + } + req.Len(resp.Kvs, 1) + return binary.BigEndian.Uint64(resp.Kvs[0].Value) + } + metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5) + req.EqualValues(5, getCheckpoint()) + metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18) + req.EqualValues(18, getCheckpoint()) + metaCli.ClearV3GlobalCheckpointForTask(ctx, task) + req.EqualValues(0, getCheckpoint()) +}