Skip to content

Commit

Permalink
Merge branch 'release-5.2' into release-5.2-8e11e0367f02
Browse files Browse the repository at this point in the history
  • Loading branch information
bestwoody authored Sep 19, 2022
2 parents d0d6f62 + ed72864 commit fb98d3a
Show file tree
Hide file tree
Showing 176 changed files with 2,837 additions and 835 deletions.
15 changes: 10 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -526,10 +526,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -560,6 +562,7 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -610,7 +613,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -755,9 +758,10 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -772,6 +776,7 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
}
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand All @@ -260,7 +259,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
12 changes: 9 additions & 3 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB

propRangeIndex = "tikv.range_index"
Expand Down Expand Up @@ -910,7 +911,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if enableCheckpoint {
Expand Down Expand Up @@ -1499,7 +1500,12 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := local.regionSplitSize * 4 / 3
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := local.regionSplitSize
if regionMaxSize <= defaultRegionSplitSize {
regionMaxSize = regionMaxSize * 4 / 3
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down
19 changes: 13 additions & 6 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (b noopBackend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) erro

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
func (b noopBackend) LocalWriter(context.Context, *backend.LocalWriterConfig, uuid.UUID) (backend.EngineWriter, error) {
return noopWriter{}, nil
return Writer{}, nil
}

func (b noopBackend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table) error {
Expand Down Expand Up @@ -168,16 +168,23 @@ func (r noopRow) Size() uint64 {
func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) {
}

type noopWriter struct{}
// Writer define a local writer that do nothing.
type Writer struct{}

func (w noopWriter) AppendRows(context.Context, string, []string, kv.Rows) error {
func (w Writer) AppendRows(context.Context, string, []string, kv.Rows) error {
return nil
}

func (w noopWriter) IsSynced() bool {
func (w Writer) IsSynced() bool {
return true
}

func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
func (w Writer) Close(context.Context) (backend.ChunkFlushStatus, error) {
return trueStatus{}, nil
}

type trueStatus struct{}

func (s trueStatus) Flushed() bool {
return true
}
13 changes: 13 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
return errors.Annotate(err, "create storage failed")
}

// return expectedErr means at least meet one file
expectedErr := errors.New("Stop Iter")
walkErr := s.WalkDir(ctx, &storage.WalkOption{ListCount: 1}, func(string, int64) error {
// return an error when meet the first regular file to break the walk loop
return expectedErr
})
if !errors.ErrorEqual(walkErr, expectedErr) {
if walkErr == nil {
return errors.Errorf("data-source-dir '%s' doesn't exist or contains no files", taskCfg.Mydumper.SourceDir)
}
return errors.Annotatef(walkErr, "visit data-source-dir '%s' failed", taskCfg.Mydumper.SourceDir)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
var mdl *mydump.MDLoader
mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s)
Expand Down
50 changes: 41 additions & 9 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"context"
"path/filepath"
"sort"
"strings"

"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -29,12 +31,30 @@ import (

type MDDatabaseMeta struct {
Name string
SchemaFile string
SchemaFile FileInfo
Tables []*MDTableMeta
Views []*MDTableMeta
charSet string
}

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error) {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.L().Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
schema = nil
}
schemaStr := strings.TrimSpace(string(schema))
// set default if schema sql is empty
if len(schemaStr) == 0 {
schemaStr = "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
}

return schemaStr, nil
}

type MDTableMeta struct {
DB string
Name string
Expand Down Expand Up @@ -218,7 +238,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
// setup database schema
if len(s.dbSchemas) != 0 {
for _, fileInfo := range s.dbSchemas {
if _, dbExists := s.insertDB(fileInfo.TableName.Schema, fileInfo.FileMeta.Path); dbExists && s.loader.router == nil {
if _, dbExists := s.insertDB(fileInfo); dbExists && s.loader.router == nil {
return errors.Errorf("invalid database schema file, duplicated item - %s", fileInfo.FileMeta.Path)
}
}
Expand Down Expand Up @@ -405,23 +425,29 @@ func (s *mdLoaderSetup) route() error {
return nil
}

func (s *mdLoaderSetup) insertDB(dbName string, path string) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[dbName]
func (s *mdLoaderSetup) insertDB(f FileInfo) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[f.TableName.Schema]
if ok {
return s.loader.dbs[dbIndex], true
}
s.dbIndexMap[dbName] = len(s.loader.dbs)
s.dbIndexMap[f.TableName.Schema] = len(s.loader.dbs)
ptr := &MDDatabaseMeta{
Name: dbName,
SchemaFile: path,
Name: f.TableName.Schema,
SchemaFile: f,
charSet: s.loader.charSet,
}
s.loader.dbs = append(s.loader.dbs, ptr)
return ptr, false
}

func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
tableIndex, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
return dbMeta.Tables[tableIndex], dbExists, true
Expand All @@ -441,7 +467,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool
}

func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
_, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
meta := &MDTableMeta{
Expand Down
Loading

0 comments on commit fb98d3a

Please sign in to comment.