Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: restore checksum shouldn't rely on backup checksum #56712

Merged
merged 13 commits into from
Nov 18, 2024
10 changes: 9 additions & 1 deletion br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (

func runBackupCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}
overrideDefaultBackupConfigIfNeeded(&cfg, command)

if err := metricsutil.RegisterMetricsForBR(cfg.PD, cfg.KeyspaceName); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -211,3 +212,10 @@ func newTxnBackupCommand() *cobra.Command {
task.DefineTxnBackupFlags(command)
return command
}

func overrideDefaultBackupConfigIfNeeded(config *task.BackupConfig, cmd *cobra.Command) {
// override only if flag not set by user
if !cmd.Flags().Changed(task.FlagChecksum) {
config.Checksum = false
}
}
6 changes: 4 additions & 2 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func timestampLogFileName() string {
return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700"))
}

// AddFlags adds flags to the given cmd.
func AddFlags(cmd *cobra.Command) {
// DefineCommonFlags defines the common flags for all BR cmd operation.
func DefineCommonFlags(cmd *cobra.Command) {
cmd.Version = build.Info()
cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR")
cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n")
Expand All @@ -104,6 +104,8 @@ func AddFlags(cmd *cobra.Command) {
"Set whether to redact sensitive info in log")
cmd.PersistentFlags().String(FlagStatusAddr, "",
"Set the HTTP listening address for the status report service. Set to empty string to disable")

// defines BR task common flags, this is shared by cmd and sql(brie)
task.DefineCommonFlags(cmd.PersistentFlags())

cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
TraverseChildren: true,
SilenceUsage: true,
}
AddFlags(rootCmd)
DefineCommonFlags(rootCmd)
SetDefaultContext(ctx)
rootCmd.AddCommand(
NewDebugCommand(),
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func runRestoreCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ss *Schemas) BackupSchemas(
}

var checksum *checkpoint.ChecksumItem
var exists bool = false
var exists = false
if ss.checkpointChecksum != nil && schema.tableInfo != nil {
checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID]
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (ss *Schemas) BackupSchemas(
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("calculate-take", calculateCost))
zap.Duration("TimeTaken", calculateCost))
}
}
if statsHandle != nil {
Expand Down
31 changes: 25 additions & 6 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ type Table struct {
StatsFileIndexes []*backuppb.StatsFileIndex
}

// NoChecksum checks whether the table has a calculated checksum.
func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}

// MetaReader wraps a reader to read both old and new version of backupmeta.
type MetaReader struct {
storage storage.ExternalStorage
Expand Down Expand Up @@ -235,14 +230,38 @@ func (reader *MetaReader) readDataFiles(ctx context.Context, output func(*backup
}

// ArchiveSize return the size of Archive data
func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 {
func ArchiveSize(files []*backuppb.File) uint64 {
total := uint64(0)
for _, file := range files {
total += file.Size_
}
return total
}

type ChecksumStats struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

func (stats ChecksumStats) ChecksumExists() bool {
if stats.Crc64Xor == 0 && stats.TotalKvs == 0 && stats.TotalBytes == 0 {
return false
}
return true
}

// CalculateChecksumStatsOnFiles returns the ChecksumStats for the given files
func CalculateChecksumStatsOnFiles(files []*backuppb.File) ChecksumStats {
var stats ChecksumStats
for _, file := range files {
stats.Crc64Xor ^= file.Crc64Xor
stats.TotalKvs += file.TotalKvs
stats.TotalBytes += file.TotalBytes
}
return stats
}

// ReadDDLs reads the ddls from the backupmeta.
// This function is compatible with the old backupmeta.
func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) {
Expand Down
30 changes: 17 additions & 13 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ func (rc *SnapClient) needLoadSchemas(backupMeta *backuppb.BackupMeta) bool {
return !(backupMeta.IsRawKv || backupMeta.IsTxnKv)
}

// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.
func (rc *SnapClient) InitBackupMeta(
// LoadSchemaIfNeededAndInitClient loads schemas from BackupMeta to initialize RestoreClient.
func (rc *SnapClient) LoadSchemaIfNeededAndInitClient(
c context.Context,
backupMeta *backuppb.BackupMeta,
backend *backuppb.StorageBackend,
Expand Down Expand Up @@ -989,7 +989,7 @@ func (rc *SnapClient) setSpeedLimit(ctx context.Context, rateLimit uint64) error
return nil
}

func (rc *SnapClient) execChecksum(
func (rc *SnapClient) execAndValidateChecksum(
ctx context.Context,
tbl *CreatedTable,
kvClient kv.Client,
Expand All @@ -1000,13 +1000,14 @@ func (rc *SnapClient) execChecksum(
zap.String("table", tbl.OldTable.Info.Name.O),
)

if tbl.OldTable.NoChecksum() {
expectedChecksumStats := metautil.CalculateChecksumStatsOnFiles(tbl.OldTable.Files)
if !expectedChecksumStats.ChecksumExists() {
Copy link
Contributor Author

@Tristan1900 Tristan1900 Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we need to do the check here, if an empty table is backed up, should we check after restore if it's still empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the error log is misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I remove the checking here at all?

logger.Warn("table has no checksum, skipping checksum")
return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.execChecksum", opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan("Client.execAndValidateChecksum", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
Expand Down Expand Up @@ -1046,21 +1047,24 @@ func (rc *SnapClient) execChecksum(
}
}
}
table := tbl.OldTable
if item.Crc64xor != table.Crc64Xor ||
item.TotalKvs != table.TotalKvs ||
item.TotalBytes != table.TotalBytes {
checksumMatch := item.Crc64xor == expectedChecksumStats.Crc64Xor &&
item.TotalKvs == expectedChecksumStats.TotalKvs &&
item.TotalBytes == expectedChecksumStats.TotalBytes
failpoint.Inject("full-restore-validate-checksum", func(_ failpoint.Value) {
checksumMatch = false
})
if !checksumMatch {
logger.Error("failed in validate checksum",
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("expected tidb crc64", expectedChecksumStats.Crc64Xor),
zap.Uint64("calculated crc64", item.Crc64xor),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
zap.Uint64("expected tidb total kvs", expectedChecksumStats.TotalKvs),
zap.Uint64("calculated total kvs", item.TotalKvs),
zap.Uint64("origin tidb total bytes", table.TotalBytes),
zap.Uint64("expected tidb total bytes", expectedChecksumStats.TotalBytes),
zap.Uint64("calculated total bytes", item.TotalBytes),
)
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
logger.Info("success in validate checksum")
logger.Info("success in validating checksum")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you also add the table / database name to this log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (rc *SnapClient) GoValidateChecksum(
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execChecksum(c, tbl, kvClient, concurrency)
err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency)
if err != nil {
return errors.Trace(err)
}
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -159,7 +158,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
}

// ParseFromFlags parses the backup-related flags from the flag set.
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error {
timeAgo, err := flags.GetDuration(flagBackupTimeago)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -212,9 +211,13 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.CompressionConfig = *compressionCfg

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
// parse common flags if needed
if !skipCommonConfig {
if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
}

cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -788,18 +791,15 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) {
return oracle.GoTimeToTS(t1), nil
}

func DefaultBackupConfig() BackupConfig {
func DefaultBackupConfig(commonConfig Config) BackupConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineBackupFlags(fs)
cfg := BackupConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
err := cfg.ParseFromFlags(fs, true)
if err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
log.Panic("failed to parse backup flags to config", zap.Error(err))
}
cfg.Config = commonConfig
return cfg
}

Expand Down
13 changes: 9 additions & 4 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
flagRateLimit = "ratelimit"
flagRateLimitUnit = "ratelimit-unit"
flagConcurrency = "concurrency"
flagChecksum = "checksum"
FlagChecksum = "checksum"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagRemoveTiFlash = "remove-tiflash"
Expand Down Expand Up @@ -297,7 +297,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of checksumming in one table")

flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node")
flags.Bool(flagChecksum, true, "Run checksum at end of task")
flags.Bool(FlagChecksum, true, "Run checksum at end of task")
flags.Bool(flagRemoveTiFlash, true,
"Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")

Expand Down Expand Up @@ -359,7 +359,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {

// HiddenFlagsForStream temporary hidden flags that stream cmd not support.
func HiddenFlagsForStream(flags *pflag.FlagSet) {
_ = flags.MarkHidden(flagChecksum)
_ = flags.MarkHidden(FlagChecksum)
_ = flags.MarkHidden(flagLoadStats)
_ = flags.MarkHidden(flagChecksumConcurrency)
_ = flags.MarkHidden(flagRateLimit)
Expand Down Expand Up @@ -609,7 +609,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.Checksum, err = flags.GetBool(flagChecksum); err != nil {
if cfg.Checksum, err = flags.GetBool(FlagChecksum); err != nil {
return errors.Trace(err)
}
if cfg.ChecksumConcurrency, err = flags.GetUint(flagChecksumConcurrency); err != nil {
Expand Down Expand Up @@ -777,6 +777,11 @@ func (cfg *Config) parseAndValidateMasterKeyInfo(hasPlaintextKey bool, flags *pf
return nil
}

// OverrideDefaultForBackup override common config for backup tasks
func (cfg *Config) OverrideDefaultForBackup() {
cfg.Checksum = false
}

// NewMgr creates a new mgr at the given PD address.
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
Expand Down
11 changes: 8 additions & 3 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ func expectedDefaultConfig() Config {
}

func expectedDefaultBackupConfig() BackupConfig {
defaultConfig := expectedDefaultConfig()
defaultConfig.Checksum = false
return BackupConfig{
Config: expectedDefaultConfig(),
Config: defaultConfig,
GCTTL: utils.DefaultBRGCSafePointTTL,
CompressionConfig: CompressionConfig{
CompressionType: backup.CompressionType_ZSTD,
Expand Down Expand Up @@ -270,13 +272,16 @@ func TestDefault(t *testing.T) {
}

func TestDefaultBackup(t *testing.T) {
def := DefaultBackupConfig()
commonConfig := DefaultConfig()
commonConfig.OverrideDefaultForBackup()
def := DefaultBackupConfig(commonConfig)
defaultConfig := expectedDefaultBackupConfig()
require.Equal(t, defaultConfig, def)
}

func TestDefaultRestore(t *testing.T) {
def := DefaultRestoreConfig()
commonConfig := DefaultConfig()
def := DefaultRestoreConfig(commonConfig)
defaultConfig := expectedDefaultRestoreConfig()
require.Equal(t, defaultConfig, def)
}
Expand Down
Loading