Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup,restore: fix --checksum flag. (#223)
Browse files Browse the repository at this point in the history
* backup,restore: work on progress to fix a bug that causes --checksum flag won't work properly.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: backup will report total bytes and kvs when checksums check disabled.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: backup will report total bytes and kvs when checksums check disabled.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: add log to ChecksumMatches and new version of FastChecksum.

Some of log has been lose. They are in ChecksumMatches now.

* restore: restore could find non-checksum tables and skip them automatically.

for backup, ChecksumMatches returns error now.

* misc: add document for Table::NoChecksum.

* backup: omit checksum progress bar when user specify `--checksum=false`.

* backup: `CopyMetaFrom` overrides original `client.Schemes` instead of append at its end.

* backup: refactor about checksum logic, fix a bug.

the bug would cause: when multi tables are backup, the metadata contains only one table.

* backup: do some lints.

* backup,restore: do some refactor so that cyclomatic complexity won't be too large.

* misc: don't use underscore on receiver.

* backup: print "quick checksum success" message per table.

...to make br_full_index happy!

* backup: refactor a MinInt pattern.

* backup: Apply suggestions from code review

Co-Authored-By: kennytm <kennytm@gmail.com>

Co-authored-by: 3pointer <luancheng@pingcap.com>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people authored Apr 10, 2020
1 parent a838be7 commit 593c33d
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 74 deletions.
103 changes: 82 additions & 21 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type ClientMgr interface {
Close()
}

// Checksum is the checksum of some backup files calculated by CollectChecksums.
type Checksum struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
Expand Down Expand Up @@ -748,8 +755,59 @@ func SendBackup(
return nil
}

// FastChecksum check data integrity by xor all(sst_checksum) per table
func (bc *Client) FastChecksum() (bool, error) {
// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV.
func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
if len(local) != len(bc.backupMeta.Schemas) {
return false, nil
}

for i, schema := range bc.backupMeta.Schemas {
localChecksum := local[i]
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse db info.")
return false, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse table info.")
return false, err
}
if localChecksum.Crc64Xor != schema.Crc64Xor ||
localChecksum.TotalBytes != schema.TotalBytes ||
localChecksum.TotalKvs != schema.TotalKvs {
log.Error("failed in fast checksum",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tblInfo.Name),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", localChecksum.Crc64Xor),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", localChecksum.TotalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes),
)
return false, nil
}
log.Info("fast checksum success",
zap.String("database", dbInfo.Name.L),
zap.String("table", tblInfo.Name.L))
}
return true, nil
}

// CollectFileInfo collects ungrouped file summary information, like kv count and size.
func (bc *Client) CollectFileInfo() {
for _, file := range bc.backupMeta.Files {
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
}

// CollectChecksums check data integrity by xor all(sst_checksum) per table
// it returns the checksum of all local files.
func (bc *Client) CollectChecksums() ([]Checksum, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand All @@ -758,19 +816,20 @@ func (bc *Client) FastChecksum() (bool, error) {

dbs, err := utils.LoadBackupTables(&bc.backupMeta)
if err != nil {
return false, err
return nil, err
}

checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err = json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return false, err
return nil, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
return false, err
return nil, err
}
tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String())

Expand All @@ -785,25 +844,16 @@ func (bc *Client) FastChecksum() (bool, error) {

summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
} else {
log.Error("failed in fast checksum",
zap.String("database", dbInfo.Name.String()),
zap.String("table", tblInfo.Name.String()),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", checksum),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", totalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", totalBytes),
)
return false, nil
log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
localChecksum := Checksum{
Crc64Xor: checksum,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
checksums = append(checksums, localChecksum)
}

return true, nil
return checksums, nil
}

// CompleteMeta wait response of admin checksum from TiDB to complete backup meta
Expand All @@ -815,3 +865,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error {
bc.backupMeta.Schemas = schemas
return nil
}

// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas))
for _, v := range backupSchemas.schemas {
schema := v
schemas = append(schemas, &schema)
}
bc.backupMeta.Schemas = schemas
}
12 changes: 0 additions & 12 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Schemas struct {
backupSchemaCh chan backup.Schema
errCh chan error
wg *sync.WaitGroup
skipChecksum bool
}

func newBackupSchemas() *Schemas {
Expand All @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending(
pending.schemas[name] = schema
}

// SetSkipChecksum sets whether it should skip checksum
func (pending *Schemas) SetSkipChecksum(skip bool) {
pending.skipChecksum = skip
}

// Start backups schemas
func (pending *Schemas) Start(
ctx context.Context,
Expand All @@ -81,12 +75,6 @@ func (pending *Schemas) Start(
workerPool.Apply(func() {
defer pending.wg.Done()

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh.Inc()
return
}

start := time.Now()
table := model.TableInfo{}
err := json.Unmarshal(schema.Table, &table)
Expand Down
8 changes: 8 additions & 0 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum(
workers.Apply(func() {
defer wg.Done()

if table.NoChecksum() {
log.Info("table doesn't have checksum, skipping checksum",
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name))
updateCh.Inc()
return
}

startTS, err := rc.GetTS(ctx)
if err != nil {
errCh <- errors.Trace(err)
Expand Down
73 changes: 46 additions & 27 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/pingcap/br/pkg/utils"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
Expand Down Expand Up @@ -180,39 +182,32 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Backup has finished
updateCh.Close()

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.SetSkipChecksum(!cfg.Checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}

if cfg.LastBackupTS == 0 {
var valid bool
valid, err = client.FastChecksum()
// Checksum from server, and then fulfill the backup metadata.
if cfg.Checksum {
backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len())
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)
err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum mismatch!")
return errors.Errorf("mismatched checksum")
// Checksum has finished
updateCh.Close()
// collect file information.
err = checkChecksums(client, cfg)
if err != nil {
return err
}

} else {
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
// When user specified not to calculate checksum, don't calculate checksum.
// Just... copy schemas from origin.
log.Info("Skip fast checksum because user requirement.")
client.CopyMetaFrom(backupSchemas)
// Anyway, let's collect file info for summary.
client.CollectFileInfo()
}
// Checksum has finished
updateCh.Close()

err = client.SaveBackupMeta(ctx, ddlJobs)
if err != nil {
Expand All @@ -224,6 +219,30 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return nil
}

// checkChecksums checks the checksum of the client, once failed,
// returning a error with message: "mismatched checksum".
func checkChecksums(client *backup.Client, cfg *BackupConfig) error {
checksums, err := client.CollectChecksums()
if err != nil {
return err
}
if cfg.LastBackupTS == 0 {
var matches bool
matches, err = client.ChecksumMatches(checksums)
if err != nil {
return err
}
if !matches {
log.Error("backup FastChecksum mismatch!")
return errors.New("mismatched checksum")
}
return nil
}
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
return nil
}

// parseTSString port from tidb setSnapshotTS
func parseTSString(ts string) (uint64, error) {
if len(ts) == 0 {
Expand Down
25 changes: 11 additions & 14 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

// Restore sst files in batch.
batchSize := int(cfg.Concurrency)
if batchSize > maxRestoreBatchSizeLimit {
batchSize = maxRestoreBatchSizeLimit // 256
}
batchSize := utils.MinInt(int(cfg.Concurrency), maxRestoreBatchSizeLimit)

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
Expand All @@ -242,9 +239,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if len(ranges) == 0 {
break
}
if batchSize > len(ranges) {
batchSize = len(ranges)
}
batchSize = utils.MinInt(batchSize, len(ranges))
var rangeBatch []rtree.Range
ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize]

Expand Down Expand Up @@ -287,14 +282,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
updateCh.Close()

// Checksum
updateCh = g.StartProgress(
ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress)
err = client.ValidateChecksum(
ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh)
if err != nil {
return err
if cfg.Checksum {
updateCh = g.StartProgress(
ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress)
err = client.ValidateChecksum(
ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh)
if err != nil {
return err
}
updateCh.Close()
}
updateCh.Close()

// Set task summary to success status.
summary.SetSuccessStatus(true)
Expand Down
12 changes: 12 additions & 0 deletions pkg/utils/math.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils

// MinInt choice smaller integer from its two arguments.
func MinInt(x, y int) int {
if x < y {
return x
}

return y
}
17 changes: 17 additions & 0 deletions pkg/utils/math_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils

import (
. "github.com/pingcap/check"
)

type testMathSuite struct{}

var _ = Suite(&testMathSuite{})

func (*testMathSuite) TestMinInt(c *C) {
c.Assert(MinInt(1, 2), Equals, 1)
c.Assert(MinInt(2, 1), Equals, 1)
c.Assert(MinInt(1, 1), Equals, 1)
}
5 changes: 5 additions & 0 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type Table struct {
TiFlashReplicas int
}

// NoChecksum checks whether the table has a calculated checksum.
func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}

// Database wraps the schema and tables of a database.
type Database struct {
Info *model.DBInfo
Expand Down
Loading

0 comments on commit 593c33d

Please sign in to comment.