Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

--checksum=false won't work. #223

Merged
merged 17 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 82 additions & 21 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type ClientMgr interface {
Close()
}

// Checksum is the checksum of some backup files calculated by CollectChecksums.
type Checksum struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
Expand Down Expand Up @@ -748,8 +755,59 @@ func SendBackup(
return nil
}

// FastChecksum check data integrity by xor all(sst_checksum) per table
func (bc *Client) FastChecksum() (bool, error) {
// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV.
func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
if len(local) != len(bc.backupMeta.Schemas) {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
}

for i, schema := range bc.backupMeta.Schemas {
localChecksum := local[i]
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse db info.")
return false, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse table info.")
return false, err
}
if localChecksum.Crc64Xor != schema.Crc64Xor ||
localChecksum.TotalBytes != schema.TotalBytes ||
localChecksum.TotalKvs != schema.TotalKvs {
log.Error("failed in fast checksum",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tblInfo.Name),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", localChecksum.Crc64Xor),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", localChecksum.TotalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes),
)
return false, nil
}
log.Info("fast checksum success",
zap.String("database", dbInfo.Name.L),
zap.String("table", tblInfo.Name.L))
}
return true, nil
}

// CollectFileInfo collects ungrouped file summary information, like kv count and size.
func (bc *Client) CollectFileInfo() {
for _, file := range bc.backupMeta.Files {
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
}

// CollectChecksums check data integrity by xor all(sst_checksum) per table
// it returns the checksum of all local files.
func (bc *Client) CollectChecksums() ([]Checksum, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand All @@ -758,19 +816,20 @@ func (bc *Client) FastChecksum() (bool, error) {

dbs, err := utils.LoadBackupTables(&bc.backupMeta)
if err != nil {
return false, err
return nil, err
}

checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err = json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return false, err
return nil, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
return false, err
return nil, err
}
tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String())

Expand All @@ -785,25 +844,16 @@ func (bc *Client) FastChecksum() (bool, error) {

summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
} else {
log.Error("failed in fast checksum",
zap.String("database", dbInfo.Name.String()),
zap.String("table", tblInfo.Name.String()),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", checksum),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", totalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", totalBytes),
)
return false, nil
log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
localChecksum := Checksum{
Crc64Xor: checksum,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
checksums = append(checksums, localChecksum)
}

return true, nil
return checksums, nil
}

// CompleteMeta wait response of admin checksum from TiDB to complete backup meta
Expand All @@ -815,3 +865,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error {
bc.backupMeta.Schemas = schemas
return nil
}

// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a deep copy here? i.e. why running

bc.backupMeta.Schemas = backupSchemas.schemas

isn't sufficient?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether it's OK to just shallow copy(i.e. will the slice be unexpected changed somewhere?), the original version(schemas.Start() -> schemas.finishTableChecksum() -> client.CompleteMeta()) makes a deep copy of the slice, so I just followed it...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think it won't be costly since the number of schema is usually small... and this method is invoked just once per backup...

schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas))
for _, v := range backupSchemas.schemas {
schema := v
schemas = append(schemas, &schema)
}
bc.backupMeta.Schemas = schemas
}
12 changes: 0 additions & 12 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Schemas struct {
backupSchemaCh chan backup.Schema
errCh chan error
wg *sync.WaitGroup
skipChecksum bool
}

func newBackupSchemas() *Schemas {
Expand All @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending(
pending.schemas[name] = schema
}

// SetSkipChecksum sets whether it should skip checksum
func (pending *Schemas) SetSkipChecksum(skip bool) {
pending.skipChecksum = skip
}

// Start backups schemas
func (pending *Schemas) Start(
ctx context.Context,
Expand All @@ -81,12 +75,6 @@ func (pending *Schemas) Start(
workerPool.Apply(func() {
defer pending.wg.Done()

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh.Inc()
return
}

start := time.Now()
table := model.TableInfo{}
err := json.Unmarshal(schema.Table, &table)
Expand Down
8 changes: 8 additions & 0 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum(
workers.Apply(func() {
defer wg.Done()

if table.NoChecksum() {
log.Info("table doesn't have checksum, skipping checksum",
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name))
updateCh.Inc()
return
}

startTS, err := rc.GetTS(ctx)
if err != nil {
errCh <- errors.Trace(err)
Expand Down
73 changes: 46 additions & 27 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/pingcap/br/pkg/utils"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
Expand Down Expand Up @@ -180,39 +182,32 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Backup has finished
updateCh.Close()

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.SetSkipChecksum(!cfg.Checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}

if cfg.LastBackupTS == 0 {
var valid bool
valid, err = client.FastChecksum()
// Checksum from server, and then fulfill the backup metadata.
if cfg.Checksum {
backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len())
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)
err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum mismatch!")
return errors.Errorf("mismatched checksum")
// Checksum has finished
updateCh.Close()
// collect file information.
err = checkChecksums(client, cfg)
if err != nil {
return err
}

} else {
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
// When user specified not to calculate checksum, don't calculate checksum.
// Just... copy schemas from origin.
log.Info("Skip fast checksum because user requirement.")
client.CopyMetaFrom(backupSchemas)
// Anyway, let's collect file info for summary.
client.CollectFileInfo()
}
// Checksum has finished
updateCh.Close()

err = client.SaveBackupMeta(ctx, ddlJobs)
if err != nil {
Expand All @@ -224,6 +219,30 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return nil
}

// checkChecksums checks the checksum of the client, once failed,
// returning a error with message: "mismatched checksum".
func checkChecksums(client *backup.Client, cfg *BackupConfig) error {
checksums, err := client.CollectChecksums()
if err != nil {
return err
}
if cfg.LastBackupTS == 0 {
var matches bool
matches, err = client.ChecksumMatches(checksums)
if err != nil {
return err
}
if !matches {
log.Error("backup FastChecksum mismatch!")
return errors.New("mismatched checksum")
}
return nil
}
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
return nil
}

// parseTSString port from tidb setSnapshotTS
func parseTSString(ts string) (uint64, error) {
if len(ts) == 0 {
Expand Down
25 changes: 11 additions & 14 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

// Restore sst files in batch.
batchSize := int(cfg.Concurrency)
if batchSize > maxRestoreBatchSizeLimit {
batchSize = maxRestoreBatchSizeLimit // 256
}
batchSize := utils.MinInt(int(cfg.Concurrency), maxRestoreBatchSizeLimit)

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
Expand All @@ -242,9 +239,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if len(ranges) == 0 {
break
}
if batchSize > len(ranges) {
batchSize = len(ranges)
}
batchSize = utils.MinInt(batchSize, len(ranges))
var rangeBatch []rtree.Range
ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize]

Expand Down Expand Up @@ -287,14 +282,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
updateCh.Close()

// Checksum
updateCh = g.StartProgress(
ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress)
err = client.ValidateChecksum(
ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh)
if err != nil {
return err
if cfg.Checksum {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
updateCh = g.StartProgress(
ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress)
err = client.ValidateChecksum(
ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh)
if err != nil {
return err
}
updateCh.Close()
}
updateCh.Close()

// Set task summary to success status.
summary.SetSuccessStatus(true)
Expand Down
12 changes: 12 additions & 0 deletions pkg/utils/math.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils

// MinInt choice smaller integer from its two arguments.
func MinInt(x, y int) int {
if x < y {
return x
}

return y
}
17 changes: 17 additions & 0 deletions pkg/utils/math_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils

import (
. "github.com/pingcap/check"
)

type testMathSuite struct{}

var _ = Suite(&testMathSuite{})

func (*testMathSuite) TestMinInt(c *C) {
c.Assert(MinInt(1, 2), Equals, 1)
c.Assert(MinInt(2, 1), Equals, 1)
c.Assert(MinInt(1, 1), Equals, 1)
}
5 changes: 5 additions & 0 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type Table struct {
TiFlashReplicas int
}

// NoChecksum checks whether the table has a calculated checksum.
func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}

// Database wraps the schema and tables of a database.
type Database struct {
Info *model.DBInfo
Expand Down
Loading