From c4316fbc74ca3546a74e3d76e40d71673576a1e7 Mon Sep 17 00:00:00 2001 From: Zak Zhao <57036248+joccau@users.noreply.github.com> Date: Thu, 27 Oct 2022 13:19:57 +0800 Subject: [PATCH] log-backup: support restart task after stop a log-backup task (#38381) close pingcap/tidb#38382 --- br/pkg/backup/client.go | 26 +++++++-- br/pkg/task/backup.go | 2 +- br/pkg/task/backup_ebs.go | 2 +- br/pkg/task/backup_raw.go | 2 +- br/pkg/task/stream.go | 115 +++++++++++++++++++++++++------------ br/pkg/task/stream_test.go | 49 ++++++++++++++++ 6 files changed, 152 insertions(+), 44 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 062fb771c0e5e..6ce3d24b0e209 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -183,13 +183,17 @@ func (bc *Client) GetStorage() storage.ExternalStorage { return bc.storage } -// SetStorage set ExternalStorage for client. -func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error { - var err error - bc.storage, err = storage.New(ctx, backend, opts) +// SetStorageAndCheckNotInUse sets ExternalStorage for client and check storage not in used by others. +func (bc *Client) SetStorageAndCheckNotInUse( + ctx context.Context, + backend *backuppb.StorageBackend, + opts *storage.ExternalStorageOptions, +) error { + err := bc.SetStorage(ctx, backend, opts) if err != nil { return errors.Trace(err) } + // backupmeta already exists exist, err := bc.storage.FileExists(ctx, metautil.MetaFile) if err != nil { @@ -204,10 +208,22 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke if err != nil { return err } - bc.backend = backend return nil } +// SetStorage sets ExternalStorage for client. +func (bc *Client) SetStorage( + ctx context.Context, + backend *backuppb.StorageBackend, + opts *storage.ExternalStorageOptions, +) error { + var err error + + bc.backend = backend + bc.storage, err = storage.New(ctx, backend, opts) + return errors.Trace(err) +} + // GetClusterID returns the cluster ID of the tidb cluster to backup. func (bc *Client) GetClusterID() uint64 { return bc.clusterID diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 6654409c46a6a..8b3a03c9a8719 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -329,7 +329,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig SendCredentials: cfg.SendCreds, CheckS3ObjectLockOptions: true, } - if err = client.SetStorage(ctx, u, &opts); err != nil { + if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil { return errors.Trace(err) } err = client.SetLockFile(ctx) diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index ec836fa83722c..aa634c671cb44 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -120,7 +120,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, } - if err = client.SetStorage(ctx, backend, &opts); err != nil { + if err = client.SetStorageAndCheckNotInUse(ctx, backend, &opts); err != nil { return errors.Trace(err) } err = client.SetLockFile(ctx) diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 8a3ca2b17b622..9d46c151d23c7 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -153,7 +153,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf SendCredentials: cfg.SendCreds, CheckS3ObjectLockOptions: true, } - if err = client.SetStorage(ctx, u, &opts); err != nil { + if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index b63110431ca06..64891ddad3321 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -310,6 +310,11 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS mgr: mgr, } if isStreamStart { + client, err := backup.NewBackupClient(ctx, mgr) + if err != nil { + return nil, errors.Trace(err) + } + backend, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) if err != nil { return nil, errors.Trace(err) @@ -319,11 +324,6 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, } - client, err := backup.NewBackupClient(ctx, mgr) - if err != nil { - return nil, errors.Trace(err) - } - if err = client.SetStorage(ctx, backend, &opts); err != nil { return nil, errors.Trace(err) } @@ -339,6 +339,10 @@ func (s *streamMgr) close() { s.mgr.Close() } +func (s *streamMgr) checkLock(ctx context.Context) (bool, error) { + return s.bc.GetStorage().FileExists(ctx, metautil.LockFile) +} + func (s *streamMgr) setLock(ctx context.Context) error { return s.bc.SetLockFile(ctx) } @@ -355,7 +359,7 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error { s.cfg.StartTS = currentTS } - if currentTS < s.cfg.StartTS || s.cfg.EndTS <= currentTS { + if currentTS < s.cfg.StartTS { return errors.Annotatef(berrors.ErrInvalidArgument, "invalid timestamps, startTS %d should be smaller than currentTS %d", s.cfg.StartTS, currentTS) @@ -438,6 +442,28 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { return nil } +func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { + se, err := g.CreateSession(s.mgr.GetStorage()) + if err != nil { + return errors.Trace(err) + } + execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) + supportStream, err := utils.IsLogBackupEnabled(execCtx) + if err != nil { + return errors.Trace(err) + } + if !supportStream { + return errors.New("Unable to create task about log-backup. " + + "please set TiKV config `log-backup.enable` to true and restart TiKVs.") + } + if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) { + return errors.Annotate(berrors.ErrUnknown, + "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.") + } + + return nil +} + // RunStreamCommand run all kinds of `stream task` func RunStreamCommand( ctx context.Context, @@ -488,38 +514,13 @@ func RunStreamStart( } defer streamMgr.close() - se, err := g.CreateSession(streamMgr.mgr.GetStorage()) - if err != nil { - return errors.Trace(err) - } - execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) - supportStream, err := utils.IsLogBackupEnabled(execCtx) - if err != nil { + if err = streamMgr.checkStreamStartEnable(g); err != nil { return errors.Trace(err) } - if !supportStream { - return errors.New("Unable to create task about log-backup. " + - "please set TiKV config `log-backup.enable` to true and restart TiKVs.") - } - if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) { - return errors.Annotate(berrors.ErrUnknown, "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.") - } - if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.setGCSafePoint( - ctx, - utils.BRServiceSafePoint{ - ID: utils.MakeSafePointID(), - TTL: cfg.SafePointTTL, - BackupTS: cfg.StartTS, - }, - ); err != nil { - return errors.Trace(err) - } - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) // It supports single stream log task currently. if count, err := cli.GetTaskCount(ctx); err != nil { @@ -528,12 +529,50 @@ func RunStreamStart( return errors.Annotate(berrors.ErrStreamLogTaskExist, "It supports single stream log task currently") } - if err = streamMgr.setLock(ctx); err != nil { + exist, err := streamMgr.checkLock(ctx) + if err != nil { return errors.Trace(err) } + // exist is true, which represents restart a stream task. Or create a new stream task. + if exist { + logInfo, err := getLogRange(ctx, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + if logInfo.clusterID > 0 && logInfo.clusterID != streamMgr.bc.GetClusterID() { + return errors.Annotatef(berrors.ErrInvalidArgument, + "the stream log files from cluster ID:%v and current cluster ID:%v ", + logInfo.clusterID, streamMgr.bc.GetClusterID()) + } - if err = streamMgr.backupFullSchemas(ctx, g); err != nil { - return errors.Trace(err) + cfg.StartTS = logInfo.logMaxTS + if err = streamMgr.setGCSafePoint( + ctx, + utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: cfg.SafePointTTL, + BackupTS: cfg.StartTS, + }, + ); err != nil { + return errors.Trace(err) + } + } else { + if err = streamMgr.setGCSafePoint( + ctx, + utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: cfg.SafePointTTL, + BackupTS: cfg.StartTS, + }, + ); err != nil { + return errors.Trace(err) + } + if err = streamMgr.setLock(ctx); err != nil { + return errors.Trace(err) + } + if err = streamMgr.backupFullSchemas(ctx, g); err != nil { + return errors.Trace(err) + } } ranges, err := streamMgr.buildObserveRanges(ctx) @@ -559,7 +598,6 @@ func RunStreamStart( Ranges: ranges, Pausing: false, } - if err = cli.PutTask(ctx, ti); err != nil { return errors.Trace(err) } @@ -1381,6 +1419,11 @@ func getLogRange( if err = backupMeta.Unmarshal(metaData); err != nil { return backupLogInfo{}, errors.Trace(err) } + // endVersion > 0 represents that the storage has been used for `br backup` + if backupMeta.GetEndVersion() > 0 { + return backupLogInfo{}, errors.Annotate(berrors.ErrStorageUnknown, + "the storage has been used for full backup") + } logStartTS := backupMeta.GetStartVersion() // truncateTS: get log truncate ts from TruncateSafePointFileName. diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 7477e5d622096..3ef57a71a07ef 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -21,8 +21,11 @@ import ( "path/filepath" "testing" + "github.com/golang/protobuf/proto" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/stretchr/testify/require" @@ -261,3 +264,49 @@ func TestGetGlobalCheckpointFromStorage(t *testing.T) { require.Nil(t, err) require.Equal(t, ts, uint64(99)) } + +func TestGetLogRangeWithFullBackupDir(t *testing.T) { + var fullBackupTS uint64 = 123456 + testDir := t.TempDir() + storage, err := storage.NewLocalStorage(testDir) + require.Nil(t, err) + + m := backuppb.BackupMeta{ + EndVersion: fullBackupTS, + } + data, err := proto.Marshal(&m) + require.Nil(t, err) + + err = storage.WriteFile(context.TODO(), metautil.MetaFile, data) + require.Nil(t, err) + + cfg := Config{ + Storage: testDir, + } + _, err = getLogRange(context.TODO(), &cfg) + require.Error(t, err, errors.Annotate(berrors.ErrStorageUnknown, + "the storage has been used for full backup")) +} + +func TestGetLogRangeWithLogBackupDir(t *testing.T) { + var startLogBackupTS uint64 = 123456 + testDir := t.TempDir() + storage, err := storage.NewLocalStorage(testDir) + require.Nil(t, err) + + m := backuppb.BackupMeta{ + StartVersion: startLogBackupTS, + } + data, err := proto.Marshal(&m) + require.Nil(t, err) + + err = storage.WriteFile(context.TODO(), metautil.MetaFile, data) + require.Nil(t, err) + + cfg := Config{ + Storage: testDir, + } + logInfo, err := getLogRange(context.TODO(), &cfg) + require.Nil(t, err) + require.Equal(t, logInfo.logMinTS, startLogBackupTS) +}