Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge master to Release 3.1 (#234)
Browse files Browse the repository at this point in the history
* enable tidb config by default (#230)

* enable tidb config by default

* backup,restore: fix --checksum flag. (#223)

* backup,restore: work on progress to fix a bug that causes --checksum flag won't work properly.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: backup will report total bytes and kvs when checksums check disabled.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: backup will report total bytes and kvs when checksums check disabled.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: add log to ChecksumMatches and new version of FastChecksum.

Some of log has been lose. They are in ChecksumMatches now.

* restore: restore could find non-checksum tables and skip them automatically.

for backup, ChecksumMatches returns error now.

* misc: add document for Table::NoChecksum.

* backup: omit checksum progress bar when user specify `--checksum=false`.

* backup: `CopyMetaFrom` overrides original `client.Schemes` instead of append at its end.

* backup: refactor about checksum logic, fix a bug.

the bug would cause: when multi tables are backup, the metadata contains only one table.

* backup: do some lints.

* backup,restore: do some refactor so that cyclomatic complexity won't be too large.

* misc: don't use underscore on receiver.

* backup: print "quick checksum success" message per table.

...to make br_full_index happy!

* backup: refactor a MinInt pattern.

* backup: Apply suggestions from code review

Co-Authored-By: kennytm <kennytm@gmail.com>

Co-authored-by: 3pointer <luancheng@pingcap.com>
Co-authored-by: kennytm <kennytm@gmail.com>

* fill size in SSTMeta (#233)

* Use table info (#231)

* create with info during incremental restore

* add test

* fix log

* address comment

* fix ci

* address commemnt

* update 3.1 dependency

* remove conf.Experimental.AllowsExpressionIndex

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people authored Apr 13, 2020
1 parent 9b76002 commit 8c09bb5
Show file tree
Hide file tree
Showing 18 changed files with 452 additions and 129 deletions.
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ require (
github.com/klauspost/cpuid v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/montanaflynn/stats v0.5.0 // indirect
github.com/onsi/ginkgo v1.11.0 // indirect
github.com/onsi/gomega v1.8.1 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200331072206-c211b473fe43
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v3.1.0-beta.2.0.20200318061433-f0b8f6cdca0d+incompatible
github.com/pingcap/pd/v3 v3.1.0-beta.2.0.20200312100832-1206736bd050
github.com/pingcap/tidb v1.1.0-beta.0.20200401121410-5854181fbbe0
github.com/pingcap/pd/v3 v3.1.0-rc.0.20200410125843-8b7475c6bfd6
github.com/pingcap/tidb v1.1.0-beta.0.20200410101606-1347df814de0
github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200317092225-ed6b2a87af54+incompatible
github.com/pingcap/tipb v0.0.0-20200401093201-cc8b75c53383
github.com/prometheus/client_golang v1.0.0
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,14 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34=
github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
Expand Down Expand Up @@ -348,8 +351,12 @@ github.com/pingcap/parser v3.1.0-beta.2.0.20200318061433-f0b8f6cdca0d+incompatib
github.com/pingcap/parser v3.1.0-beta.2.0.20200318061433-f0b8f6cdca0d+incompatible/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd/v3 v3.1.0-beta.2.0.20200312100832-1206736bd050 h1:mxPdR0pxnUcRfRGX2JnaLyAd9SZWeR42SzvMp4Zv3YI=
github.com/pingcap/pd/v3 v3.1.0-beta.2.0.20200312100832-1206736bd050/go.mod h1:0HfF1LfWLMuGpui0PKhGvkXxfjv1JslMRY6B+cae3dg=
github.com/pingcap/pd/v3 v3.1.0-rc.0.20200410125843-8b7475c6bfd6 h1:N3zllT6rtDYXPeKS/Khide/ClAH1FCNFU6/EStVwb8Y=
github.com/pingcap/pd/v3 v3.1.0-rc.0.20200410125843-8b7475c6bfd6/go.mod h1:su5qKHuRSE0oSO8DBmck0WK17T18OPjCNi3XToM+uCM=
github.com/pingcap/tidb v1.1.0-beta.0.20200401121410-5854181fbbe0 h1:bcl8cbL0K9oZ+vYWNJIgUH2rMkesjfsbJkpZpD2I+lg=
github.com/pingcap/tidb v1.1.0-beta.0.20200401121410-5854181fbbe0/go.mod h1:2cL/Jdq//AUbt/m/VmOwc7wm82oLFn7o/B6fiQtOpQE=
github.com/pingcap/tidb v1.1.0-beta.0.20200410101606-1347df814de0 h1:ZgAk3sPL0p5WEQhyRX8aSCwaDrPLqxMBitT1XP1UvqE=
github.com/pingcap/tidb v1.1.0-beta.0.20200410101606-1347df814de0/go.mod h1:2cL/Jdq//AUbt/m/VmOwc7wm82oLFn7o/B6fiQtOpQE=
github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200317092225-ed6b2a87af54+incompatible h1:tYADqdmWwgDOwf/qEN0trJAy6H3c3Tt/QZx1z4qVrRQ=
github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200317092225-ed6b2a87af54+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20200401093201-cc8b75c53383 h1:y1ayhtouCaO0u74JNMN8s20CGJT0yIuAb8UXOYnCALc=
Expand Down Expand Up @@ -588,6 +595,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 h1:gZpLHxUX5BdYLA08Lj4YCJNN/jk7KtquiArPoeX0WvA=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
Expand Down
103 changes: 82 additions & 21 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type ClientMgr interface {
Close()
}

// Checksum is the checksum of some backup files calculated by CollectChecksums.
type Checksum struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
Expand Down Expand Up @@ -747,8 +754,59 @@ func SendBackup(
return nil
}

// FastChecksum check data integrity by xor all(sst_checksum) per table
func (bc *Client) FastChecksum() (bool, error) {
// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV.
func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
if len(local) != len(bc.backupMeta.Schemas) {
return false, nil
}

for i, schema := range bc.backupMeta.Schemas {
localChecksum := local[i]
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse db info.")
return false, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse table info.")
return false, err
}
if localChecksum.Crc64Xor != schema.Crc64Xor ||
localChecksum.TotalBytes != schema.TotalBytes ||
localChecksum.TotalKvs != schema.TotalKvs {
log.Error("failed in fast checksum",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tblInfo.Name),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", localChecksum.Crc64Xor),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", localChecksum.TotalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes),
)
return false, nil
}
log.Info("fast checksum success",
zap.String("database", dbInfo.Name.L),
zap.String("table", tblInfo.Name.L))
}
return true, nil
}

// CollectFileInfo collects ungrouped file summary information, like kv count and size.
func (bc *Client) CollectFileInfo() {
for _, file := range bc.backupMeta.Files {
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
}

// CollectChecksums check data integrity by xor all(sst_checksum) per table
// it returns the checksum of all local files.
func (bc *Client) CollectChecksums() ([]Checksum, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand All @@ -757,19 +815,20 @@ func (bc *Client) FastChecksum() (bool, error) {

dbs, err := utils.LoadBackupTables(&bc.backupMeta)
if err != nil {
return false, err
return nil, err
}

checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err = json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return false, err
return nil, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
return false, err
return nil, err
}
tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String())

Expand All @@ -784,25 +843,16 @@ func (bc *Client) FastChecksum() (bool, error) {

summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
} else {
log.Error("failed in fast checksum",
zap.String("database", dbInfo.Name.String()),
zap.String("table", tblInfo.Name.String()),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", checksum),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", totalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", totalBytes),
)
return false, nil
log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
localChecksum := Checksum{
Crc64Xor: checksum,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
checksums = append(checksums, localChecksum)
}

return true, nil
return checksums, nil
}

// CompleteMeta wait response of admin checksum from TiDB to complete backup meta
Expand All @@ -814,3 +864,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error {
bc.backupMeta.Schemas = schemas
return nil
}

// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas))
for _, v := range backupSchemas.schemas {
schema := v
schemas = append(schemas, &schema)
}
bc.backupMeta.Schemas = schemas
}
12 changes: 0 additions & 12 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Schemas struct {
backupSchemaCh chan backup.Schema
errCh chan error
wg *sync.WaitGroup
skipChecksum bool
}

func newBackupSchemas() *Schemas {
Expand All @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending(
pending.schemas[name] = schema
}

// SetSkipChecksum sets whether it should skip checksum
func (pending *Schemas) SetSkipChecksum(skip bool) {
pending.skipChecksum = skip
}

// Start backups schemas
func (pending *Schemas) Start(
ctx context.Context,
Expand All @@ -81,12 +75,6 @@ func (pending *Schemas) Start(
workerPool.Apply(func() {
defer pending.wg.Done()

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh.Inc()
return
}

start := time.Now()
table := model.TableInfo{}
err := json.Unmarshal(schema.Table, &table)
Expand Down
8 changes: 8 additions & 0 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum(
workers.Apply(func() {
defer wg.Done()

if table.NoChecksum() {
log.Info("table doesn't have checksum, skipping checksum",
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name))
updateCh.Inc()
return
}

startTS, err := rc.GetTS(ctx)
if err != nil {
errCh <- errors.Trace(err)
Expand Down
22 changes: 21 additions & 1 deletion pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,27 @@ func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
var err error
if ddlJob.BinlogInfo.TableInfo != nil {
tableInfo := ddlJob.BinlogInfo.TableInfo
dbInfo := ddlJob.BinlogInfo.DBInfo
switch ddlJob.Type {
case model.ActionCreateSchema:
err = db.se.CreateDatabase(ctx, dbInfo)
if err != nil {
log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err))
}
return errors.Trace(err)
case model.ActionCreateTable:
err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo)
if err != nil {
log.Error("create table failed",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tableInfo.Name),
zap.Error(err))
}
return errors.Trace(err)
}

if tableInfo != nil {
switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDbSQL)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func getSSTMetaFromFile(
Start: rangeStart,
End: rangeEnd,
},
Length: file.GetSize_(),
RegionId: region.GetId(),
RegionEpoch: region.GetRegionEpoch(),
}
Expand Down
73 changes: 46 additions & 27 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/pingcap/br/pkg/utils"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
Expand Down Expand Up @@ -180,39 +182,32 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Backup has finished
updateCh.Close()

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.SetSkipChecksum(!cfg.Checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}

if cfg.LastBackupTS == 0 {
var valid bool
valid, err = client.FastChecksum()
// Checksum from server, and then fulfill the backup metadata.
if cfg.Checksum {
backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len())
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)
err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum mismatch!")
return errors.Errorf("mismatched checksum")
// Checksum has finished
updateCh.Close()
// collect file information.
err = checkChecksums(client, cfg)
if err != nil {
return err
}

} else {
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
// When user specified not to calculate checksum, don't calculate checksum.
// Just... copy schemas from origin.
log.Info("Skip fast checksum because user requirement.")
client.CopyMetaFrom(backupSchemas)
// Anyway, let's collect file info for summary.
client.CollectFileInfo()
}
// Checksum has finished
updateCh.Close()

err = client.SaveBackupMeta(ctx, ddlJobs)
if err != nil {
Expand All @@ -224,6 +219,30 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return nil
}

// checkChecksums checks the checksum of the client, once failed,
// returning a error with message: "mismatched checksum".
func checkChecksums(client *backup.Client, cfg *BackupConfig) error {
checksums, err := client.CollectChecksums()
if err != nil {
return err
}
if cfg.LastBackupTS == 0 {
var matches bool
matches, err = client.ChecksumMatches(checksums)
if err != nil {
return err
}
if !matches {
log.Error("backup FastChecksum mismatch!")
return errors.New("mismatched checksum")
}
return nil
}
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
return nil
}

// parseTSString port from tidb setSnapshotTS
func parseTSString(ts string) (uint64, error) {
if len(ts) == 0 {
Expand Down
Loading

0 comments on commit 8c09bb5

Please sign in to comment.