Skip to content

Commit

Permalink
log-backup: remove the checkpoint after removal (#36452)
Browse files Browse the repository at this point in the history
close #36423
  • Loading branch information
YuJuncen authored Jul 25, 2022
1 parent 48e496a commit d73c6db
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 23 deletions.
10 changes: 7 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskE
return nil
}
log.Info("meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(e); err != nil {
if err := c.onTaskEvent(ctx, e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
return err
Expand Down Expand Up @@ -391,7 +391,7 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
return
}
log.Info("meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(e); err != nil {
if err := c.onTaskEvent(ctx, e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) })
Expand All @@ -403,7 +403,7 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
}()
}

func (c *CheckpointAdvancer) onTaskEvent(e TaskEvent) error {
func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
switch e.Type {
Expand All @@ -412,6 +412,10 @@ func (c *CheckpointAdvancer) onTaskEvent(e TaskEvent) error {
case EventDel:
c.task = nil
c.state = &fullScan{}
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
c.cache.Clear()
case EventErr:
return e.Err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (t *TaskEvent) String() string {
return fmt.Sprintf("%s(%s)", t.Type, t.Name)
}

type TaskEventClient struct {
type AdvancerExt struct {
MetaDataClient
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) {
return result, nil
}

func (t TaskEventClient) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := eventFromWatch(resp)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (t TaskEventClient) startListen(ctx context.Context, rev int64, ch chan<- T
}()
}

func (t TaskEventClient) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int64, error) {
func (t AdvancerExt) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int64, error) {
tasks, rev, err := t.GetAllTasksWithRevision(ctx)
if err != nil {
return nil, 0, err
Expand All @@ -156,7 +156,7 @@ func (t TaskEventClient) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent,
return events, rev, nil
}

func (t TaskEventClient) Begin(ctx context.Context, ch chan<- TaskEvent) error {
func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
initialTasks, rev, err := t.getFullTasksAsEvent(ctx)
if err != nil {
return err
Expand All @@ -168,3 +168,20 @@ func (t TaskEventClient) Begin(ctx context.Context, ch chan<- TaskEvent) error {
t.startListen(ctx, rev+1, ch)
return nil
}

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
_, err := t.KV.Put(ctx, key, value)

if err != nil {
return err
}
return nil
}

func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error {
key := GlobalCheckpointOf(taskName)
_, err := t.KV.Delete(ctx, key)
return err
}
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c PDRegionScanner) RegionScan(ctx context.Context, key []byte, endKey []by
// clusterEnv is the environment for running in the real cluster.
type clusterEnv struct {
clis *utils.StoreManager
*TaskEventClient
*AdvancerExt
PDRegionScanner
}

Expand All @@ -71,7 +71,7 @@ func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (log
func CliEnv(cli *utils.StoreManager, etcdCli *clientv3.Client) Env {
return clusterEnv{
clis: cli,
TaskEventClient: &TaskEventClient{MetaDataClient: *NewMetaDataClient(etcdCli)},
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{cli.PDClient()},
}
}
Expand All @@ -87,7 +87,7 @@ func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (En
Time: time.Duration(conf.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(conf.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}, tconf),
TaskEventClient: &TaskEventClient{MetaDataClient: *NewMetaDataClient(etcdCli)},
AdvancerExt: &AdvancerExt{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{Client: pdCli},
}, nil
}
Expand All @@ -104,4 +104,6 @@ type StreamMeta interface {
Begin(ctx context.Context, ch chan<- TaskEvent) error
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
}
8 changes: 8 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
return nil
}

func (t *testEnv) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error {
t.mu.Lock()
defer t.mu.Unlock()

t.checkpoint = 0
return nil
}

func (t *testEnv) getCheckpoint() uint64 {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
11 changes: 0 additions & 11 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,6 @@ func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName stri
return nil
}

func (c *MetaDataClient) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
_, err := c.KV.Put(ctx, key, value)

if err != nil {
return err
}
return nil
}

// GetTask get the basic task handle from the metadata storage.
func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error) {
resp, err := c.Get(ctx, TaskOf(taskName))
Expand Down
26 changes: 24 additions & 2 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func TestIntegration(t *testing.T) {
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli, etcd) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.TaskEventClient{MetaDataClient: metaCli}) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
Expand Down Expand Up @@ -265,7 +266,7 @@ func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient,
require.Equal(t, uint64(10002), ts)
}

func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) {
func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx, cancel := context.WithCancel(context.Background())
taskName := "simple"
taskInfo := simpleTask(taskName, 4)
Expand Down Expand Up @@ -295,3 +296,24 @@ func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) {
_, ok := <-ch
require.False(t, ok)
}

func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
task := "simple"
req := require.New(t)
getCheckpoint := func() uint64 {
resp, err := metaCli.KV.Get(ctx, streamhelper.GlobalCheckpointOf(task))
req.NoError(err)
if len(resp.Kvs) == 0 {
return 0
}
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5)
req.EqualValues(5, getCheckpoint())
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18)
req.EqualValues(18, getCheckpoint())
metaCli.ClearV3GlobalCheckpointForTask(ctx, task)
req.EqualValues(0, getCheckpoint())
}

0 comments on commit d73c6db

Please sign in to comment.