Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Fix backup rawkv failure #32612

Merged
merged 13 commits into from
Mar 3, 2022
18 changes: 13 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -589,10 +589,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -623,6 +625,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -673,7 +677,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -820,9 +824,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -837,8 +843,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if cfg.Checkpoint.Enable {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func NewRestoreClient(
store kv.Storage,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
isRawKv bool,
) (*Client, error) {
db, err := NewDB(g, store)
if err != nil {
Expand All @@ -126,7 +127,7 @@ func NewRestoreClient(

return &Client{
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf),
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
db: db,
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
Expand Down Expand Up @@ -219,7 +220,7 @@ func (rc *Client) InitBackupMeta(
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit)
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{

func TestCreateTables(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)

info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64)
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestCreateTables(t *testing.T) {

func TestIsOnline(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)

require.False(t, client.IsOnline())
Expand All @@ -99,7 +99,7 @@ func TestIsOnline(t *testing.T) {

func TestPreCheckTableClusterIndex(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)

info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {

client, err := restore.NewRestoreClient(gluetidb.New(), fakePDClient{
stores: mockStores,
}, m.Storage, nil, defaultKeepaliveCfg)
}, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)

tables := make([]*metautil.Table, 4)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh, false)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
Expand Down
13 changes: 8 additions & 5 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
isRawKv bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -111,7 +112,7 @@ SplitRegions:
}
return errors.Trace(errScan)
}
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions)
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions, isRawKv)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -495,14 +496,14 @@ func (b *scanRegionBackoffer) Attempt() int {

// getSplitKeys checks if the regions should be split by the end key of
// the ranges, groups the split keys by region id.
func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte {
func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo, isRawKv bool) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
checkKeys := make([][]byte, 0)
for _, rg := range ranges {
checkKeys = append(checkKeys, rg.EndKey)
}
for _, key := range checkKeys {
if region := NeedSplit(key, regions); region != nil {
if region := NeedSplit(key, regions, isRawKv); region != nil {
splitKeys, ok := splitKeyMap[region.Region.GetId()]
if !ok {
splitKeys = make([][]byte, 0, 1)
Expand All @@ -518,12 +519,14 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*R
}

// NeedSplit checks whether a key is necessary to split, if true returns the split region.
func NeedSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo {
func NeedSplit(splitKey []byte, regions []*RegionInfo, isRawKv bool) *RegionInfo {
// If splitKey is the max key.
if len(splitKey) == 0 {
return nil
}
splitKey = codec.EncodeBytes(nil, splitKey)
if !isRawKv {
splitKey = codec.EncodeBytes(nil, splitKey)
}
for _, region := range regions {
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,17 @@ type pdClient struct {
// this may mislead the scatter.
needScatterVal bool
needScatterInit sync.Once

isRawKv bool
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client, tlsConf *tls.Config) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
cli := &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
}
return cli
}
Expand Down Expand Up @@ -256,6 +259,7 @@ func splitRegionWithFailpoint(
peer *metapb.Peer,
client tikvpb.TikvClient,
keys [][]byte,
isRawKv bool,
) (*kvrpcpb.SplitRegionResponse, error) {
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
log.Debug("failpoint not-leader-error injected.")
Expand Down Expand Up @@ -286,6 +290,7 @@ func splitRegionWithFailpoint(
Peer: peer,
},
SplitKeys: keys,
IsRawKv: isRawKv,
})
}

Expand Down Expand Up @@ -321,7 +326,7 @@ func (c *pdClient) sendSplitRegionRequest(
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down
49 changes: 29 additions & 20 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestScatterFinishInTime(t *testing.T) {
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, false, func(key [][]byte) {})
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
Expand Down Expand Up @@ -329,7 +329,7 @@ func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, false, func(key [][]byte) {})
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
Expand Down Expand Up @@ -464,26 +464,35 @@ FindRegion:
}

func TestNeedSplit(t *testing.T) {
regions := []*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
for _, isRawKv := range []bool{false, true} {
encode := func(in []byte) []byte {
if isRawKv {
return in
}
return codec.EncodeBytes([]byte{}, in)
}

regions := []*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: encode([]byte("b")),
EndKey: encode([]byte("d")),
},
},
},
}
// Out of region
require.Nil(t, restore.NeedSplit([]byte("a"), regions, isRawKv))
// Region start key
require.Nil(t, restore.NeedSplit([]byte("b"), regions, isRawKv))
// In region
region := restore.NeedSplit([]byte("c"), regions, isRawKv)
require.Equal(t, 0, bytes.Compare(region.Region.GetStartKey(), encode([]byte("b"))))
require.Equal(t, 0, bytes.Compare(region.Region.GetEndKey(), encode([]byte("d"))))
// Region end key
require.Nil(t, restore.NeedSplit([]byte("d"), regions, isRawKv))
// Out of region
require.Nil(t, restore.NeedSplit([]byte("e"), regions, isRawKv))
}
// Out of region
require.Nil(t, restore.NeedSplit([]byte("a"), regions))
// Region start key
require.Nil(t, restore.NeedSplit([]byte("b"), regions))
// In region
region := restore.NeedSplit([]byte("c"), regions)
require.Equal(t, 0, bytes.Compare(region.Region.GetStartKey(), codec.EncodeBytes([]byte{}, []byte("b"))))
require.Equal(t, 0, bytes.Compare(region.Region.GetEndKey(), codec.EncodeBytes([]byte{}, []byte("d"))))
// Region end key
require.Nil(t, restore.NeedSplit([]byte("d"), regions))
// Out of region
require.Nil(t, restore.NeedSplit([]byte("e"), regions))
}

func TestRegionConsistency(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,11 @@ func SplitRanges(
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool,
) error {
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))

return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
return splitter.Split(ctx, ranges, rewriteRules, isRawKv, func(keys [][]byte) {
for range keys {
updateCh.Inc()
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
defer mgr.Close()

keepaliveCfg.PermitWithoutStream = true
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg)
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
// sometimes we have pooled the connections.
// sending heartbeats in idle times is useful.
keepaliveCfg.PermitWithoutStream = true
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg)
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg, true)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR

// RawKV restore does not need to rewrite keys.
rewrite := &restore.RewriteRules{}
err = restore.SplitRanges(ctx, client, ranges, rewrite, updateCh)
err = restore.SplitRanges(ctx, client, ranges, rewrite, updateCh, true)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading