Skip to content

Commit

Permalink
br: Support backup replica read (#40899)
Browse files Browse the repository at this point in the history
ref #40898
  • Loading branch information
v01dstar authored Feb 9, 2023
1 parent 1167faf commit 9cb4c48
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 29 deletions.
78 changes: 59 additions & 19 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -758,6 +759,7 @@ func (bc *Client) BackupRanges(
ranges []rtree.Range,
request backuppb.BackupRequest,
concurrency uint,
replicaReadLabel map[string]string,
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -787,7 +789,7 @@ func (bc *Client) BackupRanges(
}
workerPool.ApplyOnErrorGroup(eg, func() error {
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, req, pr, metaWriter, progressCallBack)
err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack)
if err != nil {
// The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear)
if errors.Cause(err) == context.Canceled {
Expand All @@ -807,6 +809,7 @@ func (bc *Client) BackupRanges(
func (bc *Client) BackupRange(
ctx context.Context,
request backuppb.BackupRequest,
replicaReadLabel map[string]string,
progressRange *rtree.ProgressRange,
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
Expand All @@ -827,11 +830,27 @@ func (bc *Client) BackupRange(
zap.Uint64("rateLimit", request.RateLimit),
zap.Uint32("concurrency", request.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
var targetStores []*metapb.Store
targetStoreIds := make(map[uint64]struct{})
if len(replicaReadLabel) == 0 {
targetStores = allStores // send backup push down request to all stores
} else {
for _, store := range allStores {
for _, label := range store.Labels {
if val, ok := replicaReadLabel[label.Key]; !ok && val == label.Value {
targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label
targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup
}
}
}
}
if len(replicaReadLabel) > 0 && len(targetStores) == 0 {
return errors.Errorf("no store matches replica read label: %v", replicaReadLabel)
}

logutil.CL(ctx).Info("backup push down started")
// either the `incomplete` is origin range itself,
Expand All @@ -855,8 +874,8 @@ func (bc *Client) BackupRange(
req.EndKey = progressRange.Incomplete[0].EndKey
}

push := newPushDown(bc.mgr, len(allStores))
err = push.pushBackup(ctx, req, progressRange, allStores, bc.checkpointRunner, progressCallBack)
push := newPushDown(bc.mgr, len(targetStores))
err = push.pushBackup(ctx, req, progressRange, targetStores, bc.checkpointRunner, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -865,7 +884,7 @@ func (bc *Client) BackupRange(

// Find and backup remaining ranges.
// TODO: test fine grained backup.
if err := bc.fineGrainedBackup(ctx, request, progressRange, progressCallBack); err != nil {
if err := bc.fineGrainedBackup(ctx, request, targetStoreIds, progressRange, progressCallBack); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -908,34 +927,54 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
log.Error("find region failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
if len(targetStoreIds) == 0 {
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
}
} else {
candidates := make([]*metapb.Peer, 0, len(region.Meta.Peers))
for _, peer := range region.Meta.Peers {
if _, ok := targetStoreIds[peer.StoreId]; ok {
candidates = append(candidates, peer)
}
}
if len(candidates) > 0 {
peer := candidates[rand.Intn(len(candidates))]
log.Info("find target peer for backup",
zap.Reflect("Peer", peer), logutil.Key("key", key))
return peer, nil
}
}
log.Warn("no region found", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(100*i))

log.Warn("fail to find a target peer", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(1000*i))
continue
}
log.Error("can not find leader", logutil.Key("key", key))
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader")
log.Error("can not find a valid target peer", logutil.Key("key", key))
if len(targetStoreIds) == 0 {
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find a valid leader for key %s", key)
}
return nil, errors.Errorf("can not find a valid target peer for key %s", key)
}

func (bc *Client) fineGrainedBackup(
ctx context.Context,
req backuppb.BackupRequest,
targetStoreIds map[uint64]struct{},
pr *rtree.ProgressRange,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -986,7 +1025,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
subReq := req
subReq.StartKey, subReq.EndKey = rg.StartKey, rg.EndKey
backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, respCh)
backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, targetStoreIds, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -1138,13 +1177,14 @@ func (bc *Client) handleFineGrained(
ctx context.Context,
bo *tikv.Backoffer,
req backuppb.BackupRequest,
targetStoreIds map[uint64]struct{},
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, req.StartKey, req.IsRawKv)
targetPeer, pderr := bc.findTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds)
if pderr != nil {
return 0, errors.Trace(pderr)
}
storeID := leader.GetStoreId()
storeID := targetPeer.GetStoreId()
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
if err != nil {
Expand Down
43 changes: 34 additions & 9 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
flagUseBackupMetaV2 = "use-backupmeta-v2"
flagUseCheckpoint = "use-checkpoint"
flagKeyspaceName = "keyspace-name"
flagReplicaReadLabel = "replica-read-label"

flagGCTTL = "gcttl"

Expand All @@ -77,14 +78,15 @@ type CompressionConfig struct {
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
ReplicaReadLabel map[string]string `json:"replica-read-label" toml:"replica-read-label"`
CompressionConfig

// for ebs-based backup
Expand Down Expand Up @@ -139,6 +141,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) {

flags.Bool(flagUseCheckpoint, true, "use checkpoint mode")
_ = flags.MarkHidden(flagUseCheckpoint)

flags.String(flagReplicaReadLabel, "", "specify the label of the stores to be used for backup, e.g. 'label_key:label_value'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand Down Expand Up @@ -243,6 +247,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
}

cfg.ReplicaReadLabel, err = parseReplicaReadLabelFlag(flags)
if err != nil {
return errors.Trace(err)
}

return nil
}

Expand Down Expand Up @@ -485,6 +494,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
CipherInfo: &cfg.CipherInfo,
ReplicaRead: len(cfg.ReplicaReadLabel) != 0,
}
brVersion := g.GetVersion()
clusterVersion, err := mgr.GetClusterVersion(ctx)
Expand Down Expand Up @@ -619,7 +629,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}()
}
metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), metawriter, progressCallBack)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -737,3 +747,18 @@ func parseCompressionType(s string) (backuppb.CompressionType, error) {
}
return ct, nil
}

func parseReplicaReadLabelFlag(flags *pflag.FlagSet) (map[string]string, error) {
replicaReadLabelStr, err := flags.GetString(flagReplicaReadLabel)
if err != nil {
return nil, errors.Trace(err)
}
if replicaReadLabelStr == "" {
return nil, nil
}
kv := strings.Split(replicaReadLabelStr, ":")
if len(kv) != 2 {
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "invalid replica read label '%s'", replicaReadLabelStr)
}
return map[string]string{kv[0]: kv[1]}, nil
}
2 changes: 1 addition & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo)
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRange(ctx, req, progressRange, metaWriter, progressCallBack)
err = client.BackupRange(ctx, req, map[string]string{}, progressRange, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down
25 changes: 25 additions & 0 deletions br/tests/br_replica_read/placement_rule_with_learner_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[
{
"group_id": "pd",
"group_index": 0,
"group_override": false,
"rules": [
{
"group_id": "pd",
"id": "default",
"start_key": "",
"end_key": "",
"role": "voter",
"count": 2
},
{
"group_id": "pd",
"id": "learner",
"start_key": "",
"end_key": "",
"role": "learner",
"count": 1
}
]
}
]
89 changes: 89 additions & 0 deletions br/tests/br_replica_read/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/sh
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

VOTER_COUNT=$((TIKV_COUNT-1))
if [ "$VOTER_COUNT" -lt "1" ];then
echo "Skip test because there is no enough tikv"
exit 0
fi

# set random store to read only
random_store_id=$(run_pd_ctl -u https://$PD_ADDR store | jq 'first(.stores[]|select(.store.labels|(.!= null and any(.key == "engine" and .value=="tiflash"))| not)|.store.id)')
echo "random store id: $random_store_id"
run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' 'read_only'

# set placement rule to add a learner replica for each region in the read only store
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle load --out=$TEST_DIR/default_rules.json
cat tests/br_replica_read/placement_rule_with_learner_template.json | jq ".[].rules[0].count = $VOTER_COUNT" > $TEST_DIR/placement_rule_with_learner.json
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/placement_rule_with_learner.json
sleep 3 # wait for PD to apply the placement rule

run_sql "CREATE DATABASE $DB;"

run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");"
run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");"

run_sql "CREATE TABLE $DB.usertable2 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");"

# backup db
echo "backup start..."
run_br -u https://$PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --replica-read-label '$mode:read_only'

run_sql "DROP DATABASE $DB;"

# restore db
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" -u https://$PD_ADDR

run_sql "select count(*) from $DB.usertable1;"
table1_count=$(read_result)
echo "table1 count: $table1_count"
if [ "$table1_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

run_sql "select count(*) from $DB.usertable2;"
table2_count=$(read_result)
echo "table2 count: $table2_count"
if [ "$table2_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

# Test BR DDL query string
echo "testing DDL query..."
run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE TABLE'
run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE DATABASE'

run_sql "DROP DATABASE $DB;"
run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' ''
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/default_rules.json

0 comments on commit 9cb4c48

Please sign in to comment.