Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Support backup replica read #40899

Merged
merged 24 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c66cb8f
Support backup replica read
v01dstar Dec 29, 2022
334598a
Clean up
v01dstar Jan 31, 2023
3ac6c75
Add integration test for backup replica read
v01dstar Feb 1, 2023
719ba97
Use store label instead of role for backup replica read
v01dstar Feb 2, 2023
9351c51
Use kvproto main branch
v01dstar Feb 6, 2023
f30d6e6
Fix typo
v01dstar Feb 6, 2023
7a955b3
Remove mistakenly added file
v01dstar Feb 6, 2023
16ddeb5
Make backup returns earlier if no store matches the label
v01dstar Feb 6, 2023
1899385
Add fail case for backup replica read test
v01dstar Feb 6, 2023
b6f166a
Revert back placement when test is done
v01dstar Feb 6, 2023
72cf514
Fix test
v01dstar Feb 8, 2023
2304ece
Prolong the peer lookup retry time, fix test
v01dstar Feb 8, 2023
351d6e6
Stablize test
v01dstar Feb 8, 2023
efdc3bb
Normalize error message
v01dstar Feb 8, 2023
af0fca1
Merge remote-tracking branch 'upstream/master' into replica_read_backup
v01dstar Feb 8, 2023
27d5709
Fix bug
v01dstar Feb 8, 2023
9cfadae
Merge remote-tracking branch 'upstream/master' into replica_read_backup
v01dstar Feb 8, 2023
804869f
Merge branch 'master' into replica_read_backup
v01dstar Feb 8, 2023
acd3bee
Remove unused changes
v01dstar Feb 8, 2023
8835b91
Update br/pkg/task/backup.go
v01dstar Feb 8, 2023
3587c74
Merge branch 'master' into replica_read_backup
hawkingrei Feb 8, 2023
585a93b
Merge branch 'master' into replica_read_backup
ti-chi-bot Feb 9, 2023
8d9c953
Merge branch 'master' into replica_read_backup
ti-chi-bot Feb 9, 2023
add714f
Merge branch 'master' into replica_read_backup
ti-chi-bot Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -762,6 +763,7 @@ func (bc *Client) BackupRanges(
ranges []rtree.Range,
request backuppb.BackupRequest,
concurrency uint,
replicaReadLabel map[string]string,
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -791,7 +793,7 @@ func (bc *Client) BackupRanges(
}
workerPool.ApplyOnErrorGroup(eg, func() error {
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, req, pr, metaWriter, progressCallBack)
err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack)
if err != nil {
// The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear)
if errors.Cause(err) == context.Canceled {
Expand All @@ -811,6 +813,7 @@ func (bc *Client) BackupRanges(
func (bc *Client) BackupRange(
ctx context.Context,
request backuppb.BackupRequest,
replicaReadLabel map[string]string,
progressRange *rtree.ProgressRange,
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
Expand All @@ -831,11 +834,27 @@ func (bc *Client) BackupRange(
zap.Uint64("rateLimit", request.RateLimit),
zap.Uint32("concurrency", request.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
var targetStores []*metapb.Store
targetStoreIds := make(map[uint64]struct{})
if len(replicaReadLabel) == 0 {
targetStores = allStores // send backup push down request to all stores
} else {
for _, store := range allStores {
for _, label := range store.Labels {
if val, ok := replicaReadLabel[label.Key]; !ok && val == label.Value {
targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label
targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup
}
}
}
}
if len(replicaReadLabel) > 0 && len(targetStores) == 0 {
return errors.Errorf("no store matches replica read label: %v", replicaReadLabel)
}

logutil.CL(ctx).Info("backup push down started")
// either the `incomplete` is origin range itself,
Expand All @@ -859,8 +878,8 @@ func (bc *Client) BackupRange(
req.EndKey = progressRange.Incomplete[0].EndKey
}

push := newPushDown(bc.mgr, len(allStores))
err = push.pushBackup(ctx, req, progressRange, allStores, bc.checkpointRunner, progressCallBack)
push := newPushDown(bc.mgr, len(targetStores))
err = push.pushBackup(ctx, req, progressRange, targetStores, bc.checkpointRunner, progressCallBack)
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
Expand All @@ -869,7 +888,7 @@ func (bc *Client) BackupRange(

// Find and backup remaining ranges.
// TODO: test fine grained backup.
if err := bc.fineGrainedBackup(ctx, request, progressRange, progressCallBack); err != nil {
if err := bc.fineGrainedBackup(ctx, request, targetStoreIds, progressRange, progressCallBack); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -912,34 +931,54 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
log.Error("find region failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
if len(targetStoreIds) == 0 {
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
}
} else {
candidates := make([]*metapb.Peer, 0, len(region.Meta.Peers))
for _, peer := range region.Meta.Peers {
if _, ok := targetStoreIds[peer.StoreId]; ok {
candidates = append(candidates, peer)
}
}
if len(candidates) > 0 {
peer := candidates[rand.Intn(len(candidates))]
log.Info("find target peer for backup",
zap.Reflect("Peer", peer), logutil.Key("key", key))
return peer, nil
}
}
log.Warn("no region found", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(100*i))

log.Warn("fail to find a target peer", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(1000*i))
continue
}
log.Error("can not find leader", logutil.Key("key", key))
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader")
log.Error("can not find a valid target peer", logutil.Key("key", key))
if len(targetStoreIds) == 0 {
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find a valid leader for key %s", key)
}
return nil, errors.Errorf("can not find a valid target peer for key %s", key)
}

func (bc *Client) fineGrainedBackup(
ctx context.Context,
req backuppb.BackupRequest,
targetStoreIds map[uint64]struct{},
pr *rtree.ProgressRange,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -990,7 +1029,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
subReq := req
subReq.StartKey, subReq.EndKey = rg.StartKey, rg.EndKey
backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, respCh)
backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, targetStoreIds, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -1142,13 +1181,14 @@ func (bc *Client) handleFineGrained(
ctx context.Context,
bo *tikv.Backoffer,
req backuppb.BackupRequest,
targetStoreIds map[uint64]struct{},
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, req.StartKey, req.IsRawKv)
targetPeer, pderr := bc.findTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds)
if pderr != nil {
return 0, errors.Trace(pderr)
}
storeID := leader.GetStoreId()
storeID := targetPeer.GetStoreId()
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
if err != nil {
Expand Down
43 changes: 34 additions & 9 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
flagUseBackupMetaV2 = "use-backupmeta-v2"
flagUseCheckpoint = "use-checkpoint"
flagKeyspaceName = "keyspace-name"
flagReplicaReadLabel = "replica-read-label"

flagGCTTL = "gcttl"

Expand All @@ -77,14 +78,15 @@ type CompressionConfig struct {
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
ReplicaReadLabel map[string]string `json:"backup-replica-read-label" toml:"backup-replica-read-label"`
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
CompressionConfig

// for ebs-based backup
Expand Down Expand Up @@ -139,6 +141,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) {

flags.Bool(flagUseCheckpoint, true, "use checkpoint mode")
_ = flags.MarkHidden(flagUseCheckpoint)

flags.String(flagReplicaReadLabel, "", "specify the label of the stores to be used for backup, e.g. 'label_key:label_value'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand Down Expand Up @@ -243,6 +247,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
}

cfg.ReplicaReadLabel, err = parseReplicaReadLabelFlag(flags)
if err != nil {
return errors.Trace(err)
}

return nil
}

Expand Down Expand Up @@ -485,6 +494,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
CipherInfo: &cfg.CipherInfo,
ReplicaRead: len(cfg.ReplicaReadLabel) != 0,
}
brVersion := g.GetVersion()
clusterVersion, err := mgr.GetClusterVersion(ctx)
Expand Down Expand Up @@ -619,7 +629,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}()
}
metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), metawriter, progressCallBack)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -737,3 +747,18 @@ func parseCompressionType(s string) (backuppb.CompressionType, error) {
}
return ct, nil
}

func parseReplicaReadLabelFlag(flags *pflag.FlagSet) (map[string]string, error) {
replicaReadLabelStr, err := flags.GetString(flagReplicaReadLabel)
if err != nil {
return nil, errors.Trace(err)
}
if replicaReadLabelStr == "" {
return nil, nil
}
kv := strings.Split(replicaReadLabelStr, ":")
if len(kv) != 2 {
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "invalid replica read label '%s'", replicaReadLabelStr)
}
return map[string]string{kv[0]: kv[1]}, nil
}
2 changes: 1 addition & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo)
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRange(ctx, req, progressRange, metaWriter, progressCallBack)
err = client.BackupRange(ctx, req, map[string]string{}, progressRange, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/tests/_utils/run_services
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ start_services_impl() {
TIDB_CONFIG="tests/config/tidb.toml"
TIKV_CONFIG="tests/config/tikv.toml"
PD_CONFIG="tests/config/pd.toml"
RUN_TIFLASH=true
RUN_TIFLASH=false

while [[ $# -gt 0 ]]
do
Expand Down
3 changes: 3 additions & 0 deletions br/tests/br_db/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ if [ "$table_count" -ne "2" ];then
fi

meta_count=$(run_sql "SHOW STATS_META where Row_count > 0;")
echo "sdfasdfasfd-----"
echo $meta_count

if [ "$meta_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
Expand Down
25 changes: 25 additions & 0 deletions br/tests/br_replica_read/placement_rule_with_learner_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[
{
"group_id": "pd",
"group_index": 0,
"group_override": false,
"rules": [
{
"group_id": "pd",
"id": "default",
"start_key": "",
"end_key": "",
"role": "voter",
"count": 2
},
{
"group_id": "pd",
"id": "learner",
"start_key": "",
"end_key": "",
"role": "learner",
"count": 1
}
]
}
]
Loading