Skip to content

Commit

Permalink
log-backup: support restart task after stop a log-backup task (#38381)
Browse files Browse the repository at this point in the history
close #38382
  • Loading branch information
joccau authored Oct 27, 2022
1 parent ec93697 commit c4316fb
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 44 deletions.
26 changes: 21 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,17 @@ func (bc *Client) GetStorage() storage.ExternalStorage {
return bc.storage
}

// SetStorage set ExternalStorage for client.
func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
var err error
bc.storage, err = storage.New(ctx, backend, opts)
// SetStorageAndCheckNotInUse sets ExternalStorage for client and check storage not in used by others.
func (bc *Client) SetStorageAndCheckNotInUse(
ctx context.Context,
backend *backuppb.StorageBackend,
opts *storage.ExternalStorageOptions,
) error {
err := bc.SetStorage(ctx, backend, opts)
if err != nil {
return errors.Trace(err)
}

// backupmeta already exists
exist, err := bc.storage.FileExists(ctx, metautil.MetaFile)
if err != nil {
Expand All @@ -204,10 +208,22 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
if err != nil {
return err
}
bc.backend = backend
return nil
}

// SetStorage sets ExternalStorage for client.
func (bc *Client) SetStorage(
ctx context.Context,
backend *backuppb.StorageBackend,
opts *storage.ExternalStorageOptions,
) error {
var err error

bc.backend = backend
bc.storage, err = storage.New(ctx, backend, opts)
return errors.Trace(err)
}

// GetClusterID returns the cluster ID of the tidb cluster to backup.
func (bc *Client) GetClusterID() uint64 {
return bc.clusterID
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
SendCredentials: cfg.SendCreds,
CheckS3ObjectLockOptions: true,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil {
return errors.Trace(err)
}
err = client.SetLockFile(ctx)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
}
if err = client.SetStorage(ctx, backend, &opts); err != nil {
if err = client.SetStorageAndCheckNotInUse(ctx, backend, &opts); err != nil {
return errors.Trace(err)
}
err = client.SetLockFile(ctx)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
SendCredentials: cfg.SendCreds,
CheckS3ObjectLockOptions: true,
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil {
return errors.Trace(err)
}

Expand Down
115 changes: 79 additions & 36 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS
mgr: mgr,
}
if isStreamStart {
client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return nil, errors.Trace(err)
}

backend, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -319,11 +324,6 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
}
client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return nil, errors.Trace(err)
}

if err = client.SetStorage(ctx, backend, &opts); err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -339,6 +339,10 @@ func (s *streamMgr) close() {
s.mgr.Close()
}

func (s *streamMgr) checkLock(ctx context.Context) (bool, error) {
return s.bc.GetStorage().FileExists(ctx, metautil.LockFile)
}

func (s *streamMgr) setLock(ctx context.Context) error {
return s.bc.SetLockFile(ctx)
}
Expand All @@ -355,7 +359,7 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error {
s.cfg.StartTS = currentTS
}

if currentTS < s.cfg.StartTS || s.cfg.EndTS <= currentTS {
if currentTS < s.cfg.StartTS {
return errors.Annotatef(berrors.ErrInvalidArgument,
"invalid timestamps, startTS %d should be smaller than currentTS %d",
s.cfg.StartTS, currentTS)
Expand Down Expand Up @@ -438,6 +442,28 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
return nil
}

func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error {
se, err := g.CreateSession(s.mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor)
supportStream, err := utils.IsLogBackupEnabled(execCtx)
if err != nil {
return errors.Trace(err)
}
if !supportStream {
return errors.New("Unable to create task about log-backup. " +
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
}
if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) {
return errors.Annotate(berrors.ErrUnknown,
"Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.")
}

return nil
}

// RunStreamCommand run all kinds of `stream task`
func RunStreamCommand(
ctx context.Context,
Expand Down Expand Up @@ -488,38 +514,13 @@ func RunStreamStart(
}
defer streamMgr.close()

se, err := g.CreateSession(streamMgr.mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor)
supportStream, err := utils.IsLogBackupEnabled(execCtx)
if err != nil {
if err = streamMgr.checkStreamStartEnable(g); err != nil {
return errors.Trace(err)
}
if !supportStream {
return errors.New("Unable to create task about log-backup. " +
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
}
if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) {
return errors.Annotate(berrors.ErrUnknown, "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.")
}

if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil {
return errors.Trace(err)
}

if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: cfg.SafePointTTL,
BackupTS: cfg.StartTS,
},
); err != nil {
return errors.Trace(err)
}

cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// It supports single stream log task currently.
if count, err := cli.GetTaskCount(ctx); err != nil {
Expand All @@ -528,12 +529,50 @@ func RunStreamStart(
return errors.Annotate(berrors.ErrStreamLogTaskExist, "It supports single stream log task currently")
}

if err = streamMgr.setLock(ctx); err != nil {
exist, err := streamMgr.checkLock(ctx)
if err != nil {
return errors.Trace(err)
}
// exist is true, which represents restart a stream task. Or create a new stream task.
if exist {
logInfo, err := getLogRange(ctx, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
if logInfo.clusterID > 0 && logInfo.clusterID != streamMgr.bc.GetClusterID() {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the stream log files from cluster ID:%v and current cluster ID:%v ",
logInfo.clusterID, streamMgr.bc.GetClusterID())
}

if err = streamMgr.backupFullSchemas(ctx, g); err != nil {
return errors.Trace(err)
cfg.StartTS = logInfo.logMaxTS
if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: cfg.SafePointTTL,
BackupTS: cfg.StartTS,
},
); err != nil {
return errors.Trace(err)
}
} else {
if err = streamMgr.setGCSafePoint(
ctx,
utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: cfg.SafePointTTL,
BackupTS: cfg.StartTS,
},
); err != nil {
return errors.Trace(err)
}
if err = streamMgr.setLock(ctx); err != nil {
return errors.Trace(err)
}
if err = streamMgr.backupFullSchemas(ctx, g); err != nil {
return errors.Trace(err)
}
}

ranges, err := streamMgr.buildObserveRanges(ctx)
Expand All @@ -559,7 +598,6 @@ func RunStreamStart(
Ranges: ranges,
Pausing: false,
}

if err = cli.PutTask(ctx, ti); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1381,6 +1419,11 @@ func getLogRange(
if err = backupMeta.Unmarshal(metaData); err != nil {
return backupLogInfo{}, errors.Trace(err)
}
// endVersion > 0 represents that the storage has been used for `br backup`
if backupMeta.GetEndVersion() > 0 {
return backupLogInfo{}, errors.Annotate(berrors.ErrStorageUnknown,
"the storage has been used for full backup")
}
logStartTS := backupMeta.GetStartVersion()

// truncateTS: get log truncate ts from TruncateSafePointFileName.
Expand Down
49 changes: 49 additions & 0 deletions br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"path/filepath"
"testing"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -261,3 +264,49 @@ func TestGetGlobalCheckpointFromStorage(t *testing.T) {
require.Nil(t, err)
require.Equal(t, ts, uint64(99))
}

func TestGetLogRangeWithFullBackupDir(t *testing.T) {
var fullBackupTS uint64 = 123456
testDir := t.TempDir()
storage, err := storage.NewLocalStorage(testDir)
require.Nil(t, err)

m := backuppb.BackupMeta{
EndVersion: fullBackupTS,
}
data, err := proto.Marshal(&m)
require.Nil(t, err)

err = storage.WriteFile(context.TODO(), metautil.MetaFile, data)
require.Nil(t, err)

cfg := Config{
Storage: testDir,
}
_, err = getLogRange(context.TODO(), &cfg)
require.Error(t, err, errors.Annotate(berrors.ErrStorageUnknown,
"the storage has been used for full backup"))
}

func TestGetLogRangeWithLogBackupDir(t *testing.T) {
var startLogBackupTS uint64 = 123456
testDir := t.TempDir()
storage, err := storage.NewLocalStorage(testDir)
require.Nil(t, err)

m := backuppb.BackupMeta{
StartVersion: startLogBackupTS,
}
data, err := proto.Marshal(&m)
require.Nil(t, err)

err = storage.WriteFile(context.TODO(), metautil.MetaFile, data)
require.Nil(t, err)

cfg := Config{
Storage: testDir,
}
logInfo, err := getLogRange(context.TODO(), &cfg)
require.Nil(t, err)
require.Equal(t, logInfo.logMinTS, startLogBackupTS)
}

0 comments on commit c4316fb

Please sign in to comment.