diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 069073b11618d..ceda689bbb2f6 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -210,6 +210,12 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error { if cfg.FullBackupStorage, err = flags.GetString(FlagStreamFullBackupStorage); err != nil { return errors.Trace(err) } + + if cfg.StartTS > 0 && len(cfg.FullBackupStorage) > 0 { + return errors.Annotatef(berrors.ErrInvalidArgument, "%v and %v are mutually exclusive", + FlagStreamStartTS, FlagStreamFullBackupStorage) + } + return nil } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index e03c00eae6fe9..e59bb02a6dc68 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -403,10 +403,17 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro } func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { + clusterVersion, err := s.mgr.GetClusterVersion(ctx) + if err != nil { + return errors.Trace(err) + } + metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil) metaWriter.Update(func(m *backuppb.BackupMeta) { // save log startTS to backupmeta file m.StartVersion = s.cfg.StartTS + m.ClusterId = s.bc.GetClusterID() + m.ClusterVersion = clusterVersion }) schemas, err := backup.BuildFullSchema(s.mgr.GetStorage(), s.cfg.StartTS) @@ -570,16 +577,16 @@ func RunStreamMetadata( ctx = opentracing.ContextWithSpan(ctx, span1) } - logMinTS, logMaxTS, err := getLogRange(ctx, &cfg.Config) + logInfo, err := getLogRange(ctx, &cfg.Config) if err != nil { return errors.Trace(err) } - logMinDate := stream.FormatDate(oracle.GetTimeFromTS(logMinTS)) - logMaxDate := stream.FormatDate(oracle.GetTimeFromTS(logMaxTS)) - summary.Log(cmdName, zap.Uint64("log-min-ts", logMinTS), + logMinDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMinTS)) + logMaxDate := stream.FormatDate(oracle.GetTimeFromTS(logInfo.logMaxTS)) + summary.Log(cmdName, zap.Uint64("log-min-ts", logInfo.logMinTS), zap.String("log-min-date", logMinDate), - zap.Uint64("log-max-ts", logMaxTS), + zap.Uint64("log-max-ts", logInfo.logMaxTS), zap.String("log-max-date", logMaxDate), ) return nil @@ -979,30 +986,38 @@ func RunStreamRestore( ctx = opentracing.ContextWithSpan(ctx, span1) } - logMinTS, logMaxTS, err := getLogRange(ctx, &cfg.Config) + logInfo, err := getLogRange(ctx, &cfg.Config) if err != nil { return errors.Trace(err) } if cfg.RestoreTS == 0 { - cfg.RestoreTS = logMaxTS + cfg.RestoreTS = logInfo.logMaxTS } if len(cfg.FullBackupStorage) > 0 { - if cfg.StartTS, err = getFullBackupTS(ctx, cfg); err != nil { + startTS, fullClusterID, err := getFullBackupTS(ctx, cfg) + if err != nil { return errors.Trace(err) } - if cfg.StartTS < logMinTS { + if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID { + return errors.Annotatef(berrors.ErrInvalidArgument, + "the full snapshot(from cluster ID:%v) and log(from cluster ID:%v) come from different cluster.", + fullClusterID, logInfo.clusterID) + } + + cfg.StartTS = startTS + if cfg.StartTS < logInfo.logMinTS { return errors.Annotatef(berrors.ErrInvalidArgument, "it has gap between full backup ts:%d(%s) and log backup ts:%d(%s). ", cfg.StartTS, oracle.GetTimeFromTS(cfg.StartTS), - logMinTS, oracle.GetTimeFromTS(logMinTS)) + logInfo.logMinTS, oracle.GetTimeFromTS(logInfo.logMinTS)) } } log.Info("start restore on point", zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS), - zap.Uint64("log-min-ts", logMinTS), zap.Uint64("log-max-ts", logMaxTS)) - if err := checkLogRange(cfg.StartTS, cfg.RestoreTS, logMinTS, logMaxTS); err != nil { + zap.Uint64("log-min-ts", logInfo.logMinTS), zap.Uint64("log-max-ts", logInfo.logMaxTS)) + if err := checkLogRange(cfg.StartTS, cfg.RestoreTS, logInfo.logMinTS, logInfo.logMaxTS); err != nil { return errors.Trace(err) } @@ -1023,7 +1038,7 @@ func RunStreamRestore( } // restore log. cfg.adjustRestoreConfigForStreamRestore() - if err := restoreStream(ctx, g, cfg, logMinTS, logMaxTS); err != nil { + if err := restoreStream(ctx, g, cfg, logInfo.logMinTS, logInfo.logMaxTS); err != nil { return errors.Trace(err) } return nil @@ -1287,24 +1302,30 @@ func countIndices(ts map[int64]*metautil.Table) int64 { return result } +type backupLogInfo struct { + logMaxTS uint64 + logMinTS uint64 + clusterID uint64 +} + // getLogRange gets the log-min-ts and log-max-ts of starting log backup. func getLogRange( ctx context.Context, cfg *Config, -) (uint64, uint64, error) { +) (backupLogInfo, error) { _, s, err := GetStorage(ctx, cfg.Storage, cfg) if err != nil { - return 0, 0, errors.Trace(err) + return backupLogInfo{}, errors.Trace(err) } // logStartTS: Get log start ts from backupmeta file. metaData, err := s.ReadFile(ctx, metautil.MetaFile) if err != nil { - return 0, 0, errors.Trace(err) + return backupLogInfo{}, errors.Trace(err) } backupMeta := &backuppb.BackupMeta{} if err = backupMeta.Unmarshal(metaData); err != nil { - return 0, 0, errors.Trace(err) + return backupLogInfo{}, errors.Trace(err) } logStartTS := backupMeta.GetStartVersion() @@ -1312,18 +1333,22 @@ func getLogRange( // If truncateTS equals 0, which represents the stream log has never been truncated. truncateTS, err := restore.GetTSFromFile(ctx, s, restore.TruncateSafePointFileName) if err != nil { - return 0, 0, errors.Trace(err) + return backupLogInfo{}, errors.Trace(err) } logMinTS := mathutil.Max(logStartTS, truncateTS) // get max global resolved ts from metas. logMaxTS, err := getGlobalCheckpointFromStorage(ctx, s) if err != nil { - return 0, 0, errors.Trace(err) + return backupLogInfo{}, errors.Trace(err) } logMaxTS = mathutil.Max(logMinTS, logMaxTS) - return logMinTS, logMaxTS, nil + return backupLogInfo{ + logMaxTS: logMaxTS, + logMinTS: logMinTS, + clusterID: backupMeta.ClusterId, + }, nil } func getGlobalCheckpointFromStorage(ctx context.Context, s storage.ExternalStorage) (uint64, error) { @@ -1349,23 +1374,23 @@ func getGlobalCheckpointFromStorage(ctx context.Context, s storage.ExternalStora func getFullBackupTS( ctx context.Context, cfg *RestoreConfig, -) (uint64, error) { +) (uint64, uint64, error) { _, s, err := GetStorage(ctx, cfg.FullBackupStorage, &cfg.Config) if err != nil { - return 0, errors.Trace(err) + return 0, 0, errors.Trace(err) } metaData, err := s.ReadFile(ctx, metautil.MetaFile) if err != nil { - return 0, errors.Trace(err) + return 0, 0, errors.Trace(err) } backupmeta := &backuppb.BackupMeta{} if err = backupmeta.Unmarshal(metaData); err != nil { - return 0, errors.Trace(err) + return 0, 0, errors.Trace(err) } - return backupmeta.GetEndVersion(), nil + return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil } func getGlobalResolvedTS(