From a9727e7fcfa6292a6f29b48efd49703b3a8931ac Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 10 Oct 2022 16:10:27 +0800 Subject: [PATCH 1/3] support restart log backup task Signed-off-by: joccau --- br/pkg/backup/client.go | 26 ++++++++-- br/pkg/task/backup.go | 2 +- br/pkg/task/backup_ebs.go | 2 +- br/pkg/task/backup_raw.go | 2 +- br/pkg/task/stream.go | 104 +++++++++++++++++++++++++------------- 5 files changed, 93 insertions(+), 43 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 062fb771c0e5e..6ce3d24b0e209 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -183,13 +183,17 @@ func (bc *Client) GetStorage() storage.ExternalStorage { return bc.storage } -// SetStorage set ExternalStorage for client. -func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error { - var err error - bc.storage, err = storage.New(ctx, backend, opts) +// SetStorageAndCheckNotInUse sets ExternalStorage for client and check storage not in used by others. +func (bc *Client) SetStorageAndCheckNotInUse( + ctx context.Context, + backend *backuppb.StorageBackend, + opts *storage.ExternalStorageOptions, +) error { + err := bc.SetStorage(ctx, backend, opts) if err != nil { return errors.Trace(err) } + // backupmeta already exists exist, err := bc.storage.FileExists(ctx, metautil.MetaFile) if err != nil { @@ -204,10 +208,22 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke if err != nil { return err } - bc.backend = backend return nil } +// SetStorage sets ExternalStorage for client. +func (bc *Client) SetStorage( + ctx context.Context, + backend *backuppb.StorageBackend, + opts *storage.ExternalStorageOptions, +) error { + var err error + + bc.backend = backend + bc.storage, err = storage.New(ctx, backend, opts) + return errors.Trace(err) +} + // GetClusterID returns the cluster ID of the tidb cluster to backup. func (bc *Client) GetClusterID() uint64 { return bc.clusterID diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 6654409c46a6a..8b3a03c9a8719 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -329,7 +329,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig SendCredentials: cfg.SendCreds, CheckS3ObjectLockOptions: true, } - if err = client.SetStorage(ctx, u, &opts); err != nil { + if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil { return errors.Trace(err) } err = client.SetLockFile(ctx) diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index ec836fa83722c..aa634c671cb44 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -120,7 +120,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, } - if err = client.SetStorage(ctx, backend, &opts); err != nil { + if err = client.SetStorageAndCheckNotInUse(ctx, backend, &opts); err != nil { return errors.Trace(err) } err = client.SetLockFile(ctx) diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 8a3ca2b17b622..9d46c151d23c7 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -153,7 +153,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf SendCredentials: cfg.SendCreds, CheckS3ObjectLockOptions: true, } - if err = client.SetStorage(ctx, u, &opts); err != nil { + if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 9a1a06eff9693..35a927fb056a1 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -307,6 +307,11 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS mgr: mgr, } if isStreamStart { + client, err := backup.NewBackupClient(ctx, mgr) + if err != nil { + return nil, errors.Trace(err) + } + backend, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) if err != nil { return nil, errors.Trace(err) @@ -316,11 +321,6 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, } - client, err := backup.NewBackupClient(ctx, mgr) - if err != nil { - return nil, errors.Trace(err) - } - if err = client.SetStorage(ctx, backend, &opts); err != nil { return nil, errors.Trace(err) } @@ -336,6 +336,10 @@ func (s *streamMgr) close() { s.mgr.Close() } +func (s *streamMgr) checkLock(ctx context.Context) (bool, error) { + return s.bc.GetStorage().FileExists(ctx, metautil.LockFile) +} + func (s *streamMgr) setLock(ctx context.Context) error { return s.bc.SetLockFile(ctx) } @@ -352,7 +356,7 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error { s.cfg.StartTS = currentTS } - if currentTS < s.cfg.StartTS || s.cfg.EndTS <= currentTS { + if currentTS < s.cfg.StartTS { return errors.Annotatef(berrors.ErrInvalidArgument, "invalid timestamps, startTS %d should be smaller than currentTS %d", s.cfg.StartTS, currentTS) @@ -435,6 +439,28 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { return nil } +func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { + se, err := g.CreateSession(s.mgr.GetStorage()) + if err != nil { + return errors.Trace(err) + } + execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) + supportStream, err := utils.IsLogBackupEnabled(execCtx) + if err != nil { + return errors.Trace(err) + } + if !supportStream { + return errors.New("Unable to create task about log-backup. " + + "please set TiKV config `log-backup.enable` to true and restart TiKVs.") + } + if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) { + return errors.Annotate(berrors.ErrUnknown, + "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.") + } + + return nil +} + // RunStreamCommand run all kinds of `stream task` func RunStreamCommand( ctx context.Context, @@ -485,38 +511,13 @@ func RunStreamStart( } defer streamMgr.close() - se, err := g.CreateSession(streamMgr.mgr.GetStorage()) - if err != nil { + if err = streamMgr.checkStreamStartEnable(g); err != nil { return errors.Trace(err) } - execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) - supportStream, err := utils.IsLogBackupEnabled(execCtx) - if err != nil { - return errors.Trace(err) - } - if !supportStream { - return errors.New("Unable to create task about log-backup. " + - "please set TiKV config `log-backup.enable` to true and restart TiKVs.") - } - if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) { - return errors.Annotate(berrors.ErrUnknown, "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.") - } - if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.setGCSafePoint( - ctx, - utils.BRServiceSafePoint{ - ID: utils.MakeSafePointID(), - TTL: cfg.SafePointTTL, - BackupTS: cfg.StartTS, - }, - ); err != nil { - return errors.Trace(err) - } - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) // It supports single stream log task currently. if count, err := cli.GetTaskCount(ctx); err != nil { @@ -525,11 +526,40 @@ func RunStreamStart( return errors.Annotate(berrors.ErrStreamLogTaskExist, "It supports single stream log task currently") } - if err = streamMgr.setLock(ctx); err != nil { + exist, err := streamMgr.checkLock(ctx) + if err != nil { return errors.Trace(err) } + // exist is true, which represents restart a stream task. Or create a new stream task. + if exist { + logInfo, err := getLogRange(ctx, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + if logInfo.clusterID > 0 && logInfo.clusterID != streamMgr.bc.GetClusterID() { + return errors.Annotatef(berrors.ErrInvalidArgument, + "the stream log files from cluster ID:%v and current cluster ID:%v ", + logInfo.clusterID, streamMgr.bc.GetClusterID()) + } + + cfg.StartTS = logInfo.logMaxTS + } else { + if err = streamMgr.setLock(ctx); err != nil { + return errors.Trace(err) + } + if err = streamMgr.backupFullSchemas(ctx, g); err != nil { + return errors.Trace(err) + } + } - if err = streamMgr.backupFullSchemas(ctx, g); err != nil { + if err = streamMgr.setGCSafePoint( + ctx, + utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: cfg.SafePointTTL, + BackupTS: cfg.StartTS, + }, + ); err != nil { return errors.Trace(err) } @@ -556,7 +586,6 @@ func RunStreamStart( Ranges: ranges, Pausing: false, } - if err = cli.PutTask(ctx, ti); err != nil { return errors.Trace(err) } @@ -1336,6 +1365,11 @@ func getLogRange( if err = backupMeta.Unmarshal(metaData); err != nil { return backupLogInfo{}, errors.Trace(err) } + // endVersion > 0 represents that the storage has been used for `br backup` + if backupMeta.GetEndVersion() > 0 { + return backupLogInfo{}, errors.Annotate(berrors.ErrStorageUnknown, + "the storage has been used for full backup") + } logStartTS := backupMeta.GetStartVersion() // truncateTS: get log truncate ts from TruncateSafePointFileName. From 81ad2506fefe473267cbebb21e977ef1cb9539bd Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 27 Oct 2022 11:20:22 +0800 Subject: [PATCH 2/3] deal comments in pr Signed-off-by: joccau --- br/pkg/task/stream.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 35a927fb056a1..52ca27c830e23 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -543,7 +543,27 @@ func RunStreamStart( } cfg.StartTS = logInfo.logMaxTS + if err = streamMgr.setGCSafePoint( + ctx, + utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: cfg.SafePointTTL, + BackupTS: cfg.StartTS, + }, + ); err != nil { + return errors.Trace(err) + } } else { + if err = streamMgr.setGCSafePoint( + ctx, + utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: cfg.SafePointTTL, + BackupTS: cfg.StartTS, + }, + ); err != nil { + return errors.Trace(err) + } if err = streamMgr.setLock(ctx); err != nil { return errors.Trace(err) } @@ -552,17 +572,6 @@ func RunStreamStart( } } - if err = streamMgr.setGCSafePoint( - ctx, - utils.BRServiceSafePoint{ - ID: utils.MakeSafePointID(), - TTL: cfg.SafePointTTL, - BackupTS: cfg.StartTS, - }, - ); err != nil { - return errors.Trace(err) - } - ranges, err := streamMgr.buildObserveRanges(ctx) if err != nil { return errors.Trace(err) From a149c11fce60b32a889ad734583de741257b86d3 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 27 Oct 2022 12:10:44 +0800 Subject: [PATCH 3/3] add unit cases Signed-off-by: joccau --- br/pkg/task/stream_test.go | 49 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 7477e5d622096..3ef57a71a07ef 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -21,8 +21,11 @@ import ( "path/filepath" "testing" + "github.com/golang/protobuf/proto" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/stretchr/testify/require" @@ -261,3 +264,49 @@ func TestGetGlobalCheckpointFromStorage(t *testing.T) { require.Nil(t, err) require.Equal(t, ts, uint64(99)) } + +func TestGetLogRangeWithFullBackupDir(t *testing.T) { + var fullBackupTS uint64 = 123456 + testDir := t.TempDir() + storage, err := storage.NewLocalStorage(testDir) + require.Nil(t, err) + + m := backuppb.BackupMeta{ + EndVersion: fullBackupTS, + } + data, err := proto.Marshal(&m) + require.Nil(t, err) + + err = storage.WriteFile(context.TODO(), metautil.MetaFile, data) + require.Nil(t, err) + + cfg := Config{ + Storage: testDir, + } + _, err = getLogRange(context.TODO(), &cfg) + require.Error(t, err, errors.Annotate(berrors.ErrStorageUnknown, + "the storage has been used for full backup")) +} + +func TestGetLogRangeWithLogBackupDir(t *testing.T) { + var startLogBackupTS uint64 = 123456 + testDir := t.TempDir() + storage, err := storage.NewLocalStorage(testDir) + require.Nil(t, err) + + m := backuppb.BackupMeta{ + StartVersion: startLogBackupTS, + } + data, err := proto.Marshal(&m) + require.Nil(t, err) + + err = storage.WriteFile(context.TODO(), metautil.MetaFile, data) + require.Nil(t, err) + + cfg := Config{ + Storage: testDir, + } + logInfo, err := getLogRange(context.TODO(), &cfg) + require.Nil(t, err) + require.Equal(t, logInfo.logMinTS, startLogBackupTS) +}