diff --git a/br/cmd/br/backup.go b/br/cmd/br/backup.go index 925cf58c132b4..6525c85d535b9 100644 --- a/br/cmd/br/backup.go +++ b/br/cmd/br/backup.go @@ -22,10 +22,11 @@ import ( func runBackupCommand(command *cobra.Command, cmdName string) error { cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { + if err := cfg.ParseFromFlags(command.Flags(), false); err != nil { command.SilenceUsage = false return errors.Trace(err) } + overrideDefaultBackupConfigIfNeeded(&cfg, command) if err := metricsutil.RegisterMetricsForBR(cfg.PD, cfg.KeyspaceName); err != nil { return errors.Trace(err) @@ -211,3 +212,10 @@ func newTxnBackupCommand() *cobra.Command { task.DefineTxnBackupFlags(command) return command } + +func overrideDefaultBackupConfigIfNeeded(config *task.BackupConfig, cmd *cobra.Command) { + // override only if flag not set by user + if !cmd.Flags().Changed(task.FlagChecksum) { + config.Checksum = false + } +} diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index 02939c78b4676..955553c137ae9 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -86,8 +86,8 @@ func timestampLogFileName() string { return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700")) } -// AddFlags adds flags to the given cmd. -func AddFlags(cmd *cobra.Command) { +// DefineCommonFlags defines the common flags for all BR cmd operation. +func DefineCommonFlags(cmd *cobra.Command) { cmd.Version = build.Info() cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR") cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n") @@ -104,6 +104,8 @@ func AddFlags(cmd *cobra.Command) { "Set whether to redact sensitive info in log") cmd.PersistentFlags().String(FlagStatusAddr, "", "Set the HTTP listening address for the status report service. Set to empty string to disable") + + // defines BR task common flags, this is shared by cmd and sql(brie) task.DefineCommonFlags(cmd.PersistentFlags()) cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index f745920f5bfba..cad081606a0ea 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -20,7 +20,7 @@ func main() { TraverseChildren: true, SilenceUsage: true, } - AddFlags(rootCmd) + DefineCommonFlags(rootCmd) SetDefaultContext(ctx) rootCmd.AddCommand( NewDebugCommand(), diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index f991163813af2..2121105761a36 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -25,7 +25,7 @@ import ( func runRestoreCommand(command *cobra.Command, cmdName string) error { cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { + if err := cfg.ParseFromFlags(command.Flags(), false); err != nil { command.SilenceUsage = false return errors.Trace(err) } diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index ac7bc98258a19..ecb146c03f145 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -106,7 +106,7 @@ func (ss *Schemas) BackupSchemas( } var checksum *checkpoint.ChecksumItem - var exists bool = false + var exists = false if ss.checkpointChecksum != nil && schema.tableInfo != nil { checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID] } @@ -145,7 +145,7 @@ func (ss *Schemas) BackupSchemas( zap.Uint64("Crc64Xor", schema.crc64xor), zap.Uint64("TotalKvs", schema.totalKvs), zap.Uint64("TotalBytes", schema.totalBytes), - zap.Duration("calculate-take", calculateCost)) + zap.Duration("TimeTaken", calculateCost)) } } if statsHandle != nil { diff --git a/br/pkg/metautil/metafile.go b/br/pkg/metautil/metafile.go index 7b146f380243b..b39f12c278062 100644 --- a/br/pkg/metautil/metafile.go +++ b/br/pkg/metautil/metafile.go @@ -166,11 +166,6 @@ type Table struct { StatsFileIndexes []*backuppb.StatsFileIndex } -// NoChecksum checks whether the table has a calculated checksum. -func (tbl *Table) NoChecksum() bool { - return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0 -} - // MetaReader wraps a reader to read both old and new version of backupmeta. type MetaReader struct { storage storage.ExternalStorage @@ -235,7 +230,7 @@ func (reader *MetaReader) readDataFiles(ctx context.Context, output func(*backup } // ArchiveSize return the size of Archive data -func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 { +func ArchiveSize(files []*backuppb.File) uint64 { total := uint64(0) for _, file := range files { total += file.Size_ @@ -243,6 +238,30 @@ func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 return total } +type ChecksumStats struct { + Crc64Xor uint64 + TotalKvs uint64 + TotalBytes uint64 +} + +func (stats ChecksumStats) ChecksumExists() bool { + if stats.Crc64Xor == 0 && stats.TotalKvs == 0 && stats.TotalBytes == 0 { + return false + } + return true +} + +// CalculateChecksumStatsOnFiles returns the ChecksumStats for the given files +func CalculateChecksumStatsOnFiles(files []*backuppb.File) ChecksumStats { + var stats ChecksumStats + for _, file := range files { + stats.Crc64Xor ^= file.Crc64Xor + stats.TotalKvs += file.TotalKvs + stats.TotalBytes += file.TotalBytes + } + return stats +} + // ReadDDLs reads the ddls from the backupmeta. // This function is compatible with the old backupmeta. func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) { diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index d0f68157ff410..a685d31847a55 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -467,8 +467,8 @@ func (rc *SnapClient) needLoadSchemas(backupMeta *backuppb.BackupMeta) bool { return !(backupMeta.IsRawKv || backupMeta.IsTxnKv) } -// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient. -func (rc *SnapClient) InitBackupMeta( +// LoadSchemaIfNeededAndInitClient loads schemas from BackupMeta to initialize RestoreClient. +func (rc *SnapClient) LoadSchemaIfNeededAndInitClient( c context.Context, backupMeta *backuppb.BackupMeta, backend *backuppb.StorageBackend, @@ -989,7 +989,7 @@ func (rc *SnapClient) setSpeedLimit(ctx context.Context, rateLimit uint64) error return nil } -func (rc *SnapClient) execChecksum( +func (rc *SnapClient) execAndValidateChecksum( ctx context.Context, tbl *CreatedTable, kvClient kv.Client, @@ -1000,13 +1000,14 @@ func (rc *SnapClient) execChecksum( zap.String("table", tbl.OldTable.Info.Name.O), ) - if tbl.OldTable.NoChecksum() { + expectedChecksumStats := metautil.CalculateChecksumStatsOnFiles(tbl.OldTable.Files) + if !expectedChecksumStats.ChecksumExists() { logger.Warn("table has no checksum, skipping checksum") return nil } if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.execChecksum", opentracing.ChildOf(span.Context())) + span1 := span.Tracer().StartSpan("Client.execAndValidateChecksum", opentracing.ChildOf(span.Context())) defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } @@ -1046,21 +1047,24 @@ func (rc *SnapClient) execChecksum( } } } - table := tbl.OldTable - if item.Crc64xor != table.Crc64Xor || - item.TotalKvs != table.TotalKvs || - item.TotalBytes != table.TotalBytes { + checksumMatch := item.Crc64xor == expectedChecksumStats.Crc64Xor && + item.TotalKvs == expectedChecksumStats.TotalKvs && + item.TotalBytes == expectedChecksumStats.TotalBytes + failpoint.Inject("full-restore-validate-checksum", func(_ failpoint.Value) { + checksumMatch = false + }) + if !checksumMatch { logger.Error("failed in validate checksum", - zap.Uint64("origin tidb crc64", table.Crc64Xor), + zap.Uint64("expected tidb crc64", expectedChecksumStats.Crc64Xor), zap.Uint64("calculated crc64", item.Crc64xor), - zap.Uint64("origin tidb total kvs", table.TotalKvs), + zap.Uint64("expected tidb total kvs", expectedChecksumStats.TotalKvs), zap.Uint64("calculated total kvs", item.TotalKvs), - zap.Uint64("origin tidb total bytes", table.TotalBytes), + zap.Uint64("expected tidb total bytes", expectedChecksumStats.TotalBytes), zap.Uint64("calculated total bytes", item.TotalBytes), ) return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum") } - logger.Info("success in validate checksum") + logger.Info("success in validating checksum") return nil } diff --git a/br/pkg/restore/snap_client/pipeline_items.go b/br/pkg/restore/snap_client/pipeline_items.go index f76417a636c4a..3f74434e72f02 100644 --- a/br/pkg/restore/snap_client/pipeline_items.go +++ b/br/pkg/restore/snap_client/pipeline_items.go @@ -166,7 +166,7 @@ func (rc *SnapClient) GoValidateChecksum( elapsed := time.Since(start) summary.CollectSuccessUnit("table checksum", 1, elapsed) }() - err := rc.execChecksum(c, tbl, kvClient, concurrency) + err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index ab77b59bdc7a3..af92518043a90 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -42,7 +42,6 @@ import ( "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" kvutil "github.com/tikv/client-go/v2/util" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -159,7 +158,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { } // ParseFromFlags parses the backup-related flags from the flag set. -func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { +func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error { timeAgo, err := flags.GetDuration(flagBackupTimeago) if err != nil { return errors.Trace(err) @@ -212,9 +211,13 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.CompressionConfig = *compressionCfg - if err = cfg.Config.ParseFromFlags(flags); err != nil { - return errors.Trace(err) + // parse common flags if needed + if !skipCommonConfig { + if err = cfg.Config.ParseFromFlags(flags); err != nil { + return errors.Trace(err) + } } + cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers) if err != nil { return errors.Trace(err) @@ -788,18 +791,15 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) { return oracle.GoTimeToTS(t1), nil } -func DefaultBackupConfig() BackupConfig { +func DefaultBackupConfig(commonConfig Config) BackupConfig { fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) - DefineCommonFlags(fs) DefineBackupFlags(fs) cfg := BackupConfig{} - err := multierr.Combine( - cfg.ParseFromFlags(fs), - cfg.Config.ParseFromFlags(fs), - ) + err := cfg.ParseFromFlags(fs, true) if err != nil { - log.Panic("infallible operation failed.", zap.Error(err)) + log.Panic("failed to parse backup flags to config", zap.Error(err)) } + cfg.Config = commonConfig return cfg } diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index ebb53968d5fea..08841700936e8 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -64,7 +64,7 @@ const ( flagRateLimit = "ratelimit" flagRateLimitUnit = "ratelimit-unit" flagConcurrency = "concurrency" - flagChecksum = "checksum" + FlagChecksum = "checksum" flagFilter = "filter" flagCaseSensitive = "case-sensitive" flagRemoveTiFlash = "remove-tiflash" @@ -297,7 +297,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of checksumming in one table") flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node") - flags.Bool(flagChecksum, true, "Run checksum at end of task") + flags.Bool(FlagChecksum, true, "Run checksum at end of task") flags.Bool(flagRemoveTiFlash, true, "Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash") @@ -359,7 +359,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { // HiddenFlagsForStream temporary hidden flags that stream cmd not support. func HiddenFlagsForStream(flags *pflag.FlagSet) { - _ = flags.MarkHidden(flagChecksum) + _ = flags.MarkHidden(FlagChecksum) _ = flags.MarkHidden(flagLoadStats) _ = flags.MarkHidden(flagChecksumConcurrency) _ = flags.MarkHidden(flagRateLimit) @@ -609,7 +609,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } - if cfg.Checksum, err = flags.GetBool(flagChecksum); err != nil { + if cfg.Checksum, err = flags.GetBool(FlagChecksum); err != nil { return errors.Trace(err) } if cfg.ChecksumConcurrency, err = flags.GetUint(flagChecksumConcurrency); err != nil { @@ -777,6 +777,11 @@ func (cfg *Config) parseAndValidateMasterKeyInfo(hasPlaintextKey bool, flags *pf return nil } +// OverrideDefaultForBackup override common config for backup tasks +func (cfg *Config) OverrideDefaultForBackup() { + cfg.Checksum = false +} + // NewMgr creates a new mgr at the given PD address. func NewMgr(ctx context.Context, g glue.Glue, pds []string, diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index 5979ef6eebeeb..a756aefb795e3 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -229,8 +229,10 @@ func expectedDefaultConfig() Config { } func expectedDefaultBackupConfig() BackupConfig { + defaultConfig := expectedDefaultConfig() + defaultConfig.Checksum = false return BackupConfig{ - Config: expectedDefaultConfig(), + Config: defaultConfig, GCTTL: utils.DefaultBRGCSafePointTTL, CompressionConfig: CompressionConfig{ CompressionType: backup.CompressionType_ZSTD, @@ -270,13 +272,16 @@ func TestDefault(t *testing.T) { } func TestDefaultBackup(t *testing.T) { - def := DefaultBackupConfig() + commonConfig := DefaultConfig() + commonConfig.OverrideDefaultForBackup() + def := DefaultBackupConfig(commonConfig) defaultConfig := expectedDefaultBackupConfig() require.Equal(t, defaultConfig, def) } func TestDefaultRestore(t *testing.T) { - def := DefaultRestoreConfig() + commonConfig := DefaultConfig() + def := DefaultRestoreConfig(commonConfig) defaultConfig := expectedDefaultRestoreConfig() require.Equal(t, defaultConfig, def) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 5bee261ceff21..77893aaa913ec 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -347,7 +347,7 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error { } // ParseFromFlags parses the restore-related flags from the flag set. -func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { +func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error { var err error cfg.NoSchema, err = flags.GetBool(flagNoSchema) if err != nil { @@ -357,10 +357,15 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - err = cfg.Config.ParseFromFlags(flags) - if err != nil { - return errors.Trace(err) + + // parse common config if needed + if !skipCommonConfig { + err = cfg.Config.ParseFromFlags(flags) + if err != nil { + return errors.Trace(err) + } } + err = cfg.RestoreCommonConfig.ParseFromFlags(flags) if err != nil { return errors.Trace(err) @@ -641,20 +646,16 @@ func registerTaskToPD(ctx context.Context, etcdCLI *clientv3.Client) (closeF fun return register.Close, errors.Trace(err) } -func DefaultRestoreConfig() RestoreConfig { +func DefaultRestoreConfig(commonConfig Config) RestoreConfig { fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) - DefineCommonFlags(fs) DefineRestoreFlags(fs) cfg := RestoreConfig{} - err := multierr.Combine( - cfg.ParseFromFlags(fs), - cfg.RestoreCommonConfig.ParseFromFlags(fs), - cfg.Config.ParseFromFlags(fs), - ) + err := cfg.ParseFromFlags(fs, true) if err != nil { - log.Panic("infallible failed.", zap.Error(err)) + log.Panic("failed to parse restore flags to config", zap.Error(err)) } + cfg.Config = commonConfig return cfg } @@ -801,7 +802,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, reader, cfg.LoadStats); err != nil { + if err = client.LoadSchemaIfNeededAndInitClient(c, backupMeta, u, reader, cfg.LoadStats); err != nil { return errors.Trace(err) } @@ -822,7 +823,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } } - archiveSize := reader.ArchiveSize(ctx, files) + archiveSize := metautil.ArchiveSize(files) g.Record(summary.RestoreDataSize, archiveSize) //restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247 g.Record("Size", archiveSize) @@ -1108,8 +1109,9 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s errCh := make(chan error, 32) postHandleCh := afterTableRestoredCh(ctx, createdTables) - // pipeline checksum - if cfg.Checksum { + // pipeline checksum only when enabled and is not incremental snapshot repair mode cuz incremental doesn't have + // enough information in backup meta to validate checksum + if cfg.Checksum && !client.IsIncremental() { postHandleCh = client.GoValidateChecksum( ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency) } @@ -1124,7 +1126,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s finish := dropToBlackhole(ctx, postHandleCh, errCh) - // Reset speed limit. ResetSpeedLimit must be called after client.InitBackupMeta has been called. + // Reset speed limit. ResetSpeedLimit must be called after client.LoadSchemaIfNeededAndInitClient has been called. defer func() { var resetErr error // In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 13a7382d6092c..cbcda5679fb57 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -109,7 +109,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil { + if err = client.LoadSchemaIfNeededAndInitClient(c, backupMeta, u, reader, true); err != nil { return errors.Trace(err) } @@ -121,7 +121,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR if err != nil { return errors.Trace(err) } - archiveSize := reader.ArchiveSize(ctx, files) + archiveSize := metautil.ArchiveSize(files) g.Record(summary.RestoreDataSize, archiveSize) if len(files) == 0 { diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 4a4a832aad660..d686eb97e2154 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -54,7 +54,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, reader, true); err != nil { + if err = client.LoadSchemaIfNeededAndInitClient(c, backupMeta, u, reader, true); err != nil { return errors.Trace(err) } @@ -63,7 +63,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) } files := backupMeta.Files - archiveSize := reader.ArchiveSize(ctx, files) + archiveSize := metautil.ArchiveSize(files) g.Record(summary.RestoreDataSize, archiveSize) if len(files) == 0 { diff --git a/br/tests/br_file_corruption/run.sh b/br/tests/br_file_corruption/run.sh index 35a7698bb9fef..60907ac2e7a4c 100644 --- a/br/tests/br_file_corruption/run.sh +++ b/br/tests/br_file_corruption/run.sh @@ -22,33 +22,62 @@ CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) run_sql "CREATE DATABASE $DB;" go-ycsb load mysql -P $CUR/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --checksum=false -filename=$(find $TEST_DIR/$DB -regex ".*.sst" | head -n 1) -filename_temp=$filename"_temp" -filename_bak=$filename"_bak" -echo "corruption" > $filename_temp -cat $filename >> $filename_temp +# Replace the single file manipulation with a loop over all .sst files +for filename in $(find $TEST_DIR/$DB -name "*.sst"); do + filename_temp="${filename}_temp" + filename_bak="${filename}_bak" + echo "corruption" > "$filename_temp" + cat "$filename" >> "$filename_temp" + mv "$filename" "$filename_bak" +done + +# need to drop db otherwise restore will fail because of cluster not fresh but not the expected issue +run_sql "DROP DATABASE IF EXISTS $DB;" # file lost -mv $filename $filename_bak export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" || restore_fail=1 export GO_FAILPOINTS="" if [ $restore_fail -ne 1 ]; then - echo 'restore success' + echo 'expect restore to fail on file lost but succeed' exit 1 fi +run_sql "DROP DATABASE IF EXISTS $DB;" # file corruption -mv $filename_temp $filename -truncate --size=-11 $filename +for filename in $(find $TEST_DIR/$DB -name "*.sst_temp"); do + mv "$filename" "${filename%_temp}" + truncate -s -11 "${filename%_temp}" +done + export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" || restore_fail=1 export GO_FAILPOINTS="" if [ $restore_fail -ne 1 ]; then - echo 'restore success' + echo 'expect restore to fail on file corruption but succeed' + exit 1 +fi +run_sql "DROP DATABASE IF EXISTS $DB;" + +# verify validating checksum is still performed even backup didn't enable it +for filename in $(find $TEST_DIR/$DB -name "*.sst_bak"); do + mv "$filename" "${filename%_bak}" +done + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/full-restore-validate-checksum=return(true)" +restore_fail=0 +run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true || restore_fail=1 +export GO_FAILPOINTS="" +if [ $restore_fail -ne 1 ]; then + echo 'expect restore to fail on checksum mismatch but succeed' exit 1 fi +run_sql "DROP DATABASE IF EXISTS $DB;" + +# sanity check restore can succeed +run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true +echo 'file corruption tests passed' diff --git a/br/tests/br_full_ddl/run.sh b/br/tests/br_full_ddl/run.sh index 5f9d67184e7d6..70e9b544fabb5 100755 --- a/br/tests/br_full_ddl/run.sh +++ b/br/tests/br_full_ddl/run.sh @@ -125,7 +125,7 @@ echo "backup start with stats..." unset BR_LOG_TO_TERM cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk '{print $2}') -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --log-file $LOG --ignore-stats=false || cat $LOG +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --log-file $LOG --ignore-stats=false --checksum=true || cat $LOG checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) if [ "${checksum_count}" -lt "1" ];then diff --git a/br/tests/br_full_index/run.sh b/br/tests/br_full_index/run.sh index edcac1bfa2377..28f959c10b5f4 100755 --- a/br/tests/br_full_index/run.sh +++ b/br/tests/br_full_index/run.sh @@ -41,7 +41,7 @@ echo "backup start..." # Do not log to terminal unset BR_LOG_TO_TERM # do not backup stats to test whether we can restore without stats. -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ignore-stats=true --log-file $LOG || cat $LOG +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ignore-stats=true --log-file $LOG --checksum=true || cat $LOG BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) diff --git a/pkg/executor/brie.go b/pkg/executor/brie.go index c0ec6528f50b9..c9b5a759cbaef 100644 --- a/pkg/executor/brie.go +++ b/pkg/executor/brie.go @@ -283,7 +283,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Key: tidbCfg.Security.ClusterSSLKey, } pds := strings.Split(tidbCfg.Path, ",") + + // build common config and override for specific task if needed cfg := task.DefaultConfig() + switch s.Kind { + case ast.BRIEKindBackup: + cfg.OverrideDefaultForBackup() + default: + } + cfg.PD = pds cfg.TLS = tlsCfg @@ -384,8 +392,7 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) switch s.Kind { case ast.BRIEKindBackup: - bcfg := task.DefaultBackupConfig() - bcfg.Config = cfg + bcfg := task.DefaultBackupConfig(cfg) e.backupCfg = &bcfg for _, opt := range s.Options { @@ -430,8 +437,7 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } case ast.BRIEKindRestore: - rcfg := task.DefaultRestoreConfig() - rcfg.Config = cfg + rcfg := task.DefaultRestoreConfig(cfg) e.restoreCfg = &rcfg for _, opt := range s.Options { switch opt.Tp { diff --git a/pkg/executor/brie_test.go b/pkg/executor/brie_test.go index 7bfc014a2d215..305cd74417755 100644 --- a/pkg/executor/brie_test.go +++ b/pkg/executor/brie_test.go @@ -147,7 +147,7 @@ func TestFetchShowBRIE(t *testing.T) { require.Equal(t, info2Res, fetchShowBRIEResult(t, e, brieColTypes)) } -func TestBRIEBuilderOPtions(t *testing.T) { +func TestBRIEBuilderOptions(t *testing.T) { sctx := mock.NewContext() sctx.GetSessionVars().User = &auth.UserIdentity{Username: "test"} is := infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable()}) @@ -156,9 +156,10 @@ func TestBRIEBuilderOPtions(t *testing.T) { ctx := context.Background() p := parser.New() p.SetParserConfig(parser.ParserConfig{EnableWindowFunction: true, EnableStrictDoubleTypeCheck: true}) - failpoint.Enable("github.com/pingcap/tidb/pkg/executor/modifyStore", `return("tikv")`) + err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/modifyStore", `return("tikv")`) + require.NoError(t, err) defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/modifyStore") - err := os.WriteFile("/tmp/keyfile", []byte(strings.Repeat("A", 128)), 0644) + err = os.WriteFile("/tmp/keyfile", []byte(strings.Repeat("A", 128)), 0644) require.NoError(t, err) stmt, err := p.ParseOneStmt("BACKUP TABLE `a` TO 'noop://' CHECKSUM_CONCURRENCY = 4 IGNORE_STATS = 1 COMPRESSION_LEVEL = 4 COMPRESSION_TYPE = 'lz4' ENCRYPTION_METHOD = 'aes256-ctr' ENCRYPTION_KEYFILE = '/tmp/keyfile'", "", "") @@ -190,6 +191,7 @@ func TestBRIEBuilderOPtions(t *testing.T) { require.NoError(t, builder.err) e, ok := exec.(*BRIEExec) require.True(t, ok) + require.False(t, e.backupCfg.Checksum) require.Equal(t, uint(4), e.backupCfg.ChecksumConcurrency) require.Equal(t, int32(4), e.backupCfg.CompressionLevel) require.Equal(t, true, e.backupCfg.IgnoreStats) @@ -223,6 +225,7 @@ func TestBRIEBuilderOPtions(t *testing.T) { e, ok = exec.(*BRIEExec) require.True(t, ok) require.Equal(t, uint(4), e.restoreCfg.ChecksumConcurrency) + require.True(t, e.restoreCfg.Checksum) require.True(t, e.restoreCfg.WaitTiflashReady) require.True(t, e.restoreCfg.WithSysTable) require.True(t, e.restoreCfg.LoadStats)