Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Support backup replica read #40899

Merged
merged 24 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c66cb8f
Support backup replica read
v01dstar Dec 29, 2022
334598a
Clean up
v01dstar Jan 31, 2023
3ac6c75
Add integration test for backup replica read
v01dstar Feb 1, 2023
719ba97
Use store label instead of role for backup replica read
v01dstar Feb 2, 2023
9351c51
Use kvproto main branch
v01dstar Feb 6, 2023
f30d6e6
Fix typo
v01dstar Feb 6, 2023
7a955b3
Remove mistakenly added file
v01dstar Feb 6, 2023
16ddeb5
Make backup returns earlier if no store matches the label
v01dstar Feb 6, 2023
1899385
Add fail case for backup replica read test
v01dstar Feb 6, 2023
b6f166a
Revert back placement when test is done
v01dstar Feb 6, 2023
72cf514
Fix test
v01dstar Feb 8, 2023
2304ece
Prolong the peer lookup retry time, fix test
v01dstar Feb 8, 2023
351d6e6
Stablize test
v01dstar Feb 8, 2023
efdc3bb
Normalize error message
v01dstar Feb 8, 2023
af0fca1
Merge remote-tracking branch 'upstream/master' into replica_read_backup
v01dstar Feb 8, 2023
27d5709
Fix bug
v01dstar Feb 8, 2023
9cfadae
Merge remote-tracking branch 'upstream/master' into replica_read_backup
v01dstar Feb 8, 2023
804869f
Merge branch 'master' into replica_read_backup
v01dstar Feb 8, 2023
acd3bee
Remove unused changes
v01dstar Feb 8, 2023
8835b91
Update br/pkg/task/backup.go
v01dstar Feb 8, 2023
3587c74
Merge branch 'master' into replica_read_backup
hawkingrei Feb 8, 2023
585a93b
Merge branch 'master' into replica_read_backup
ti-chi-bot Feb 9, 2023
8d9c953
Merge branch 'master' into replica_read_backup
ti-chi-bot Feb 9, 2023
add714f
Merge branch 'master' into replica_read_backup
ti-chi-bot Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -758,6 +759,7 @@ func (bc *Client) BackupRanges(
ranges []rtree.Range,
request backuppb.BackupRequest,
concurrency uint,
replicaReadLabel map[string]string,
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -787,7 +789,7 @@ func (bc *Client) BackupRanges(
}
workerPool.ApplyOnErrorGroup(eg, func() error {
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, req, pr, metaWriter, progressCallBack)
err := bc.BackupRange(elctx, req, replicaReadLabel, pr, metaWriter, progressCallBack)
if err != nil {
// The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear)
if errors.Cause(err) == context.Canceled {
Expand All @@ -807,6 +809,7 @@ func (bc *Client) BackupRanges(
func (bc *Client) BackupRange(
ctx context.Context,
request backuppb.BackupRequest,
replicaReadLabel map[string]string,
progressRange *rtree.ProgressRange,
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
Expand All @@ -827,11 +830,27 @@ func (bc *Client) BackupRange(
zap.Uint64("rateLimit", request.RateLimit),
zap.Uint32("concurrency", request.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
var targetStores []*metapb.Store
targetStoreIds := make(map[uint64]struct{})
if len(replicaReadLabel) == 0 {
targetStores = allStores // send backup push down request to all stores
} else {
for _, store := range allStores {
for _, label := range store.Labels {
if val, ok := replicaReadLabel[label.Key]; !ok && val == label.Value {
targetStores = append(targetStores, store) // send backup push down request to stores that match replica read label
targetStoreIds[store.GetId()] = struct{}{} // record store id for fine grained backup
}
}
}
}
if len(replicaReadLabel) > 0 && len(targetStores) == 0 {
return errors.Errorf("no store matches replica read label: %v", replicaReadLabel)
}

logutil.CL(ctx).Info("backup push down started")
// either the `incomplete` is origin range itself,
Expand All @@ -855,8 +874,8 @@ func (bc *Client) BackupRange(
req.EndKey = progressRange.Incomplete[0].EndKey
}

push := newPushDown(bc.mgr, len(allStores))
err = push.pushBackup(ctx, req, progressRange, allStores, bc.checkpointRunner, progressCallBack)
push := newPushDown(bc.mgr, len(targetStores))
err = push.pushBackup(ctx, req, progressRange, targetStores, bc.checkpointRunner, progressCallBack)
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
Expand All @@ -865,7 +884,7 @@ func (bc *Client) BackupRange(

// Find and backup remaining ranges.
// TODO: test fine grained backup.
if err := bc.fineGrainedBackup(ctx, request, progressRange, progressCallBack); err != nil {
if err := bc.fineGrainedBackup(ctx, request, targetStoreIds, progressRange, progressCallBack); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -908,34 +927,54 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
log.Error("find region failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
if len(targetStoreIds) == 0 {
if region.Leader != nil {
log.Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
}
} else {
candidates := make([]*metapb.Peer, 0, len(region.Meta.Peers))
for _, peer := range region.Meta.Peers {
if _, ok := targetStoreIds[peer.StoreId]; ok {
candidates = append(candidates, peer)
}
}
if len(candidates) > 0 {
peer := candidates[rand.Intn(len(candidates))]
log.Info("find target peer for backup",
zap.Reflect("Peer", peer), logutil.Key("key", key))
return peer, nil
}
}
log.Warn("no region found", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(100*i))

log.Warn("fail to find a target peer", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(1000*i))
continue
}
log.Error("can not find leader", logutil.Key("key", key))
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader")
log.Error("can not find a valid target peer", logutil.Key("key", key))
if len(targetStoreIds) == 0 {
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find a valid leader for key %s", key)
}
return nil, errors.Errorf("can not find a valid target peer for key %s", key)
}

func (bc *Client) fineGrainedBackup(
ctx context.Context,
req backuppb.BackupRequest,
targetStoreIds map[uint64]struct{},
pr *rtree.ProgressRange,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -986,7 +1025,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
subReq := req
subReq.StartKey, subReq.EndKey = rg.StartKey, rg.EndKey
backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, respCh)
backoffMs, err := bc.handleFineGrained(ctx, boFork, subReq, targetStoreIds, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -1138,13 +1177,14 @@ func (bc *Client) handleFineGrained(
ctx context.Context,
bo *tikv.Backoffer,
req backuppb.BackupRequest,
targetStoreIds map[uint64]struct{},
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, req.StartKey, req.IsRawKv)
targetPeer, pderr := bc.findTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds)
if pderr != nil {
return 0, errors.Trace(pderr)
}
storeID := leader.GetStoreId()
storeID := targetPeer.GetStoreId()
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
if err != nil {
Expand Down
43 changes: 34 additions & 9 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
flagUseBackupMetaV2 = "use-backupmeta-v2"
flagUseCheckpoint = "use-checkpoint"
flagKeyspaceName = "keyspace-name"
flagReplicaReadLabel = "replica-read-label"

flagGCTTL = "gcttl"

Expand All @@ -77,14 +78,15 @@ type CompressionConfig struct {
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
ReplicaReadLabel map[string]string `json:"backup-replica-read-label" toml:"backup-replica-read-label"`
v01dstar marked this conversation as resolved.
Show resolved Hide resolved
CompressionConfig

// for ebs-based backup
Expand Down Expand Up @@ -139,6 +141,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) {

flags.Bool(flagUseCheckpoint, true, "use checkpoint mode")
_ = flags.MarkHidden(flagUseCheckpoint)

flags.String(flagReplicaReadLabel, "", "specify the label of the stores to be used for backup, e.g. 'label_key:label_value'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand Down Expand Up @@ -243,6 +247,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}
}

cfg.ReplicaReadLabel, err = parseReplicaReadLabelFlag(flags)
if err != nil {
return errors.Trace(err)
}

return nil
}

Expand Down Expand Up @@ -485,6 +494,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
CipherInfo: &cfg.CipherInfo,
ReplicaRead: len(cfg.ReplicaReadLabel) != 0,
}
brVersion := g.GetVersion()
clusterVersion, err := mgr.GetClusterVersion(ctx)
Expand Down Expand Up @@ -619,7 +629,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}()
}
metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), metawriter, progressCallBack)
err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.ReplicaReadLabel, metawriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -737,3 +747,18 @@ func parseCompressionType(s string) (backuppb.CompressionType, error) {
}
return ct, nil
}

func parseReplicaReadLabelFlag(flags *pflag.FlagSet) (map[string]string, error) {
replicaReadLabelStr, err := flags.GetString(flagReplicaReadLabel)
if err != nil {
return nil, errors.Trace(err)
}
if replicaReadLabelStr == "" {
return nil, nil
}
kv := strings.Split(replicaReadLabelStr, ":")
if len(kv) != 2 {
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "invalid replica read label '%s'", replicaReadLabelStr)
}
return map[string]string{kv[0]: kv[1]}, nil
}
2 changes: 1 addition & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo)
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
err = client.BackupRange(ctx, req, progressRange, metaWriter, progressCallBack)
err = client.BackupRange(ctx, req, map[string]string{}, progressRange, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down
25 changes: 25 additions & 0 deletions br/tests/br_replica_read/placement_rule_with_learner_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[
{
"group_id": "pd",
"group_index": 0,
"group_override": false,
"rules": [
{
"group_id": "pd",
"id": "default",
"start_key": "",
"end_key": "",
"role": "voter",
"count": 2
},
{
"group_id": "pd",
"id": "learner",
"start_key": "",
"end_key": "",
"role": "learner",
"count": 1
}
]
}
]
89 changes: 89 additions & 0 deletions br/tests/br_replica_read/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/sh
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

VOTER_COUNT=$((TIKV_COUNT-1))
if [ "$VOTER_COUNT" -lt "1" ];then
echo "Skip test because there is no enough tikv"
exit 0
fi

# set random store to read only
random_store_id=$(run_pd_ctl -u https://$PD_ADDR store | jq 'first(.stores[]|select(.store.labels|(.!= null and any(.key == "engine" and .value=="tiflash"))| not)|.store.id)')
echo "random store id: $random_store_id"
run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' 'read_only'

# set placement rule to add a learner replica for each region in the read only store
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle load --out=$TEST_DIR/default_rules.json
cat tests/br_replica_read/placement_rule_with_learner_template.json | jq ".[].rules[0].count = $VOTER_COUNT" > $TEST_DIR/placement_rule_with_learner.json
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/placement_rule_with_learner.json
sleep 3 # wait for PD to apply the placement rule

run_sql "CREATE DATABASE $DB;"

run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");"
run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");"

run_sql "CREATE TABLE $DB.usertable2 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");"

# backup db
echo "backup start..."
run_br -u https://$PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --replica-read-label '$mode:read_only'

run_sql "DROP DATABASE $DB;"

# restore db
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" -u https://$PD_ADDR

run_sql "select count(*) from $DB.usertable1;"
table1_count=$(read_result)
echo "table1 count: $table1_count"
if [ "$table1_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

run_sql "select count(*) from $DB.usertable2;"
table2_count=$(read_result)
echo "table2 count: $table2_count"
if [ "$table2_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

# Test BR DDL query string
echo "testing DDL query..."
run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE TABLE'
run_curl https://$TIDB_STATUS_ADDR/ddl/history | grep -E '/\*from\(br\)\*/CREATE DATABASE'

run_sql "DROP DATABASE $DB;"
run_pd_ctl -u https://$PD_ADDR store label $random_store_id '$mode' ''
run_pd_ctl -u https://$PD_ADDR config placement-rules rule-bundle save --in $TEST_DIR/default_rules.json