Skip to content

Commit

Permalink
log-backup: refuse PiTR if the full-snapshot and stream-log do not co…
Browse files Browse the repository at this point in the history
…me from the same cluster (#37546)

close #29501
  • Loading branch information
joccau authored Sep 15, 2022
1 parent c677b2a commit fae88ae
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 25 deletions.
6 changes: 6 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error {
if cfg.FullBackupStorage, err = flags.GetString(FlagStreamFullBackupStorage); err != nil {
return errors.Trace(err)
}

if cfg.StartTS > 0 && len(cfg.FullBackupStorage) > 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "%v and %v are mutually exclusive",
FlagStreamStartTS, FlagStreamFullBackupStorage)
}

return nil
}

Expand Down
75 changes: 50 additions & 25 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,17 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro
}

func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
clusterVersion, err := s.mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}

metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil)
metaWriter.Update(func(m *backuppb.BackupMeta) {
// save log startTS to backupmeta file
m.StartVersion = s.cfg.StartTS
m.ClusterId = s.bc.GetClusterID()
m.ClusterVersion = clusterVersion
})

schemas, err := backup.BuildFullSchema(s.mgr.GetStorage(), s.cfg.StartTS)
Expand Down Expand Up @@ -570,16 +577,16 @@ func RunStreamMetadata(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

logMinTS, logMaxTS, err := getLogRange(ctx, &cfg.Config)
logInfo, err := getLogRange(ctx, &cfg.Config)
if err != nil {
return errors.Trace(err)
}

logMinDate := stream.FormatDate(oracle.GetTimeFromTS(logMinTS))
logMaxDate := stream.FormatDate(oracle.GetTimeFromTS(logMaxTS))
summary.Log(cmdName, zap.Uint64("log-min-ts", logMinTS),
logMinDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMinTS))
logMaxDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMaxTS))
summary.Log(cmdName, zap.Uint64("log-min-ts", logInfo.logMinTS),
zap.String("log-min-date", logMinDate),
zap.Uint64("log-max-ts", logMaxTS),
zap.Uint64("log-max-ts", logInfo.logMaxTS),
zap.String("log-max-date", logMaxDate),
)
return nil
Expand Down Expand Up @@ -979,30 +986,38 @@ func RunStreamRestore(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

logMinTS, logMaxTS, err := getLogRange(ctx, &cfg.Config)
logInfo, err := getLogRange(ctx, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
if cfg.RestoreTS == 0 {
cfg.RestoreTS = logMaxTS
cfg.RestoreTS = logInfo.logMaxTS
}

if len(cfg.FullBackupStorage) > 0 {
if cfg.StartTS, err = getFullBackupTS(ctx, cfg); err != nil {
startTS, fullClusterID, err := getFullBackupTS(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
if cfg.StartTS < logMinTS {
if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the full snapshot(from cluster ID:%v) and log(from cluster ID:%v) come from different cluster.",
fullClusterID, logInfo.clusterID)
}

cfg.StartTS = startTS
if cfg.StartTS < logInfo.logMinTS {
return errors.Annotatef(berrors.ErrInvalidArgument,
"it has gap between full backup ts:%d(%s) and log backup ts:%d(%s). ",
cfg.StartTS, oracle.GetTimeFromTS(cfg.StartTS),
logMinTS, oracle.GetTimeFromTS(logMinTS))
logInfo.logMinTS, oracle.GetTimeFromTS(logInfo.logMinTS))
}
}

log.Info("start restore on point",
zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS),
zap.Uint64("log-min-ts", logMinTS), zap.Uint64("log-max-ts", logMaxTS))
if err := checkLogRange(cfg.StartTS, cfg.RestoreTS, logMinTS, logMaxTS); err != nil {
zap.Uint64("log-min-ts", logInfo.logMinTS), zap.Uint64("log-max-ts", logInfo.logMaxTS))
if err := checkLogRange(cfg.StartTS, cfg.RestoreTS, logInfo.logMinTS, logInfo.logMaxTS); err != nil {
return errors.Trace(err)
}

Expand All @@ -1023,7 +1038,7 @@ func RunStreamRestore(
}
// restore log.
cfg.adjustRestoreConfigForStreamRestore()
if err := restoreStream(ctx, g, cfg, logMinTS, logMaxTS); err != nil {
if err := restoreStream(ctx, g, cfg, logInfo.logMinTS, logInfo.logMaxTS); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -1287,43 +1302,53 @@ func countIndices(ts map[int64]*metautil.Table) int64 {
return result
}

type backupLogInfo struct {
logMaxTS uint64
logMinTS uint64
clusterID uint64
}

// getLogRange gets the log-min-ts and log-max-ts of starting log backup.
func getLogRange(
ctx context.Context,
cfg *Config,
) (uint64, uint64, error) {
) (backupLogInfo, error) {
_, s, err := GetStorage(ctx, cfg.Storage, cfg)
if err != nil {
return 0, 0, errors.Trace(err)
return backupLogInfo{}, errors.Trace(err)
}

// logStartTS: Get log start ts from backupmeta file.
metaData, err := s.ReadFile(ctx, metautil.MetaFile)
if err != nil {
return 0, 0, errors.Trace(err)
return backupLogInfo{}, errors.Trace(err)
}
backupMeta := &backuppb.BackupMeta{}
if err = backupMeta.Unmarshal(metaData); err != nil {
return 0, 0, errors.Trace(err)
return backupLogInfo{}, errors.Trace(err)
}
logStartTS := backupMeta.GetStartVersion()

// truncateTS: get log truncate ts from TruncateSafePointFileName.
// If truncateTS equals 0, which represents the stream log has never been truncated.
truncateTS, err := restore.GetTSFromFile(ctx, s, restore.TruncateSafePointFileName)
if err != nil {
return 0, 0, errors.Trace(err)
return backupLogInfo{}, errors.Trace(err)
}
logMinTS := mathutil.Max(logStartTS, truncateTS)

// get max global resolved ts from metas.
logMaxTS, err := getGlobalCheckpointFromStorage(ctx, s)
if err != nil {
return 0, 0, errors.Trace(err)
return backupLogInfo{}, errors.Trace(err)
}
logMaxTS = mathutil.Max(logMinTS, logMaxTS)

return logMinTS, logMaxTS, nil
return backupLogInfo{
logMaxTS: logMaxTS,
logMinTS: logMinTS,
clusterID: backupMeta.ClusterId,
}, nil
}

func getGlobalCheckpointFromStorage(ctx context.Context, s storage.ExternalStorage) (uint64, error) {
Expand All @@ -1349,23 +1374,23 @@ func getGlobalCheckpointFromStorage(ctx context.Context, s storage.ExternalStora
func getFullBackupTS(
ctx context.Context,
cfg *RestoreConfig,
) (uint64, error) {
) (uint64, uint64, error) {
_, s, err := GetStorage(ctx, cfg.FullBackupStorage, &cfg.Config)
if err != nil {
return 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

metaData, err := s.ReadFile(ctx, metautil.MetaFile)
if err != nil {
return 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

backupmeta := &backuppb.BackupMeta{}
if err = backupmeta.Unmarshal(metaData); err != nil {
return 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

return backupmeta.GetEndVersion(), nil
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

func getGlobalResolvedTS(
Expand Down

0 comments on commit fae88ae

Please sign in to comment.