Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: refactor split code for two phase split. #48244

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ var (
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
Expand Down
60 changes: 47 additions & 13 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/checkpoint"
"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
Expand Down Expand Up @@ -119,10 +120,12 @@ type Client struct {
dbPool []*DB
rateLimit uint64
isOnline bool
granularity string
noSchema bool
hasSpeedLimited bool

restoreStores []uint64
storeCount int

cipher *backuppb.CipherInfo
switchModeInterval time.Duration
Expand Down Expand Up @@ -457,6 +460,10 @@ func (rc *Client) GetDomain() *domain.Domain {
return rc.dom
}

func (rc *Client) GetStoreCount() int {
return rc.storeCount
}

// GetPDClient returns a pd client.
func (rc *Client) GetPDClient() pd.Client {
return rc.pdClient
Expand Down Expand Up @@ -523,10 +530,25 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
return nil
}

func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) {
func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) {
storeWorkerPoolMap := make(map[uint64]chan struct{})
storeStatisticMap := make(map[uint64]*int64)
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash)
if err != nil {
log.Fatal("failed to get stores", zap.Error(err))
}
concurrencyPerStore := 512
for _, store := range stores {
ch := make(chan struct{}, concurrencyPerStore)
for i := 0; i < concurrencyPerStore; i += 1 {
ch <- struct{}{}
}
storeWorkerPoolMap[store.Id] = ch
storeStatisticMap[store.Id] = new(int64)
}
metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, rc.rewriteMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, storeStatisticMap, rc.rewriteMode)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -566,7 +588,7 @@ func (rc *Client) InitBackupMeta(
}
rc.backupMeta = backupMeta

rc.InitClients(backend, backupMeta.IsRawKv, backupMeta.IsTxnKv)
rc.InitClients(c, backend, backupMeta.IsRawKv, backupMeta.IsTxnKv)
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
}
Expand Down Expand Up @@ -643,6 +665,16 @@ func (rc *Client) EnableOnline() {
rc.isOnline = true
}

// SetGranularity sets the ganularity of restore pipeline.
func (rc *Client) SetGranularity(g string) {
rc.granularity = g
}

// GetGranularity sets the ganularity of restore pipeline.
func (rc *Client) GetGranularity() string {
return rc.granularity
}

// GetTLSConfig returns the tls config.
func (rc *Client) GetTLSConfig() *tls.Config {
return rc.tlsConf
Expand Down Expand Up @@ -1224,7 +1256,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, rc.rewriteMode)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, nil, rc.rewriteMode)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -2005,31 +2037,33 @@ const (

// LoadRestoreStores loads the stores used to restore data.
func (rc *Client) LoadRestoreStores(ctx context.Context) error {
if !rc.isOnline {
return nil
}
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.LoadRestoreStores", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash)

stores, err := rc.pdClient.GetAllStores(ctx)
if err != nil {
return errors.Trace(err)
}
for _, s := range stores {
if s.GetState() != metapb.StoreState_Up {
continue
}
for _, l := range s.GetLabels() {
if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue {
rc.restoreStores = append(rc.restoreStores, s.GetId())
break
rc.storeCount++
if rc.isOnline {
for _, l := range s.GetLabels() {
if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue {
rc.restoreStores = append(rc.restoreStores, s.GetId())
break
}
}
}
}
log.Info("load restore stores", zap.Uint64s("store-ids", rc.restoreStores))
if rc.isOnline {
log.Info("load restore stores", zap.Uint64s("store-ids", rc.restoreStores))
}
return nil
}

Expand Down
67 changes: 55 additions & 12 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ const (
)

const (
importScanRegionTime = 10 * time.Second
importScanRegionTime = 20 * time.Second
gRPCBackOffMaxDelay = 3 * time.Second
gRPCTimeOut = 10 * time.Minute
)

// RewriteMode is a mode flag that tells the TiKV how to handle the rewrite rules.
Expand Down Expand Up @@ -267,6 +268,10 @@ type FileImporter struct {
importClient ImporterClient
backend *backuppb.StorageBackend

storeWorkerPoolRWLock sync.RWMutex
storeWorkerPoolMap map[uint64]chan struct{}
storeStatisticMap map[uint64]*int64

kvMode KvMode
rawStartKey []byte
rawEndKey []byte
Expand All @@ -283,6 +288,8 @@ func NewFileImporter(
backend *backuppb.StorageBackend,
isRawKvMode bool,
isTxnKvMode bool,
storeWorkerPoolMap map[uint64]chan struct{},
storeStatisticMap map[uint64]*int64,
rewriteMode RewriteMode,
) FileImporter {
kvMode := TiDB
Expand All @@ -293,12 +300,14 @@ func NewFileImporter(
kvMode = Txn
}
return FileImporter{
metaClient: metaClient,
backend: backend,
importClient: importClient,
kvMode: kvMode,
rewriteMode: rewriteMode,
cacheKey: fmt.Sprintf("BR-%s-%d", time.Now().Format("20060102150405"), rand.Int63()),
metaClient: metaClient,
backend: backend,
importClient: importClient,
storeStatisticMap: storeStatisticMap,
storeWorkerPoolMap: storeWorkerPoolMap,
kvMode: kvMode,
rewriteMode: rewriteMode,
cacheKey: fmt.Sprintf("BR-%s-%d", time.Now().Format("20060102150405"), rand.Int63()),
}
}

Expand Down Expand Up @@ -569,17 +578,18 @@ func (importer *FileImporter) ImportSSTFiles(
log.Debug("download file done",
zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)),
logutil.Key("start", files[0].StartKey), logutil.Key("end", files[0].EndKey))
if errIngest := importer.ingest(ctx, info, downloadMetas); errIngest != nil {
start = time.Now()
if errIngest := importer.ingest(ctx, files, info, downloadMetas); errIngest != nil {
log.Warn("ingest file failed, retry later",
logutil.Files(files),
logutil.SSTMetas(downloadMetas),
logutil.Region(info.Region),
zap.Error(errIngest))
return errors.Trace(errIngest)
}
log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)))
}

log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)))
for _, f := range files {
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
Expand Down Expand Up @@ -719,7 +729,32 @@ func (importer *FileImporter) downloadSST(
for _, p := range regionInfo.Region.GetPeers() {
peer := p
eg.Go(func() error {
resp, err := importer.importClient.DownloadSST(ectx, peer.GetStoreId(), req)
importer.storeWorkerPoolRWLock.RLock()
workerCh := importer.storeWorkerPoolMap[peer.GetStoreId()]
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
statis := importer.storeStatisticMap[peer.GetStoreId()]
defer func() {
atomic.AddInt64(statis, -1)
workerCh <- struct{}{}
importer.storeWorkerPoolRWLock.RUnlock()
}()
<-workerCh
atomic.AddInt64(statis, 1)

var err error
var resp *import_sstpb.DownloadResponse
for i := 0; i < 5; i += 1 {
dctx, cancel := context.WithTimeout(ectx, gRPCTimeOut)
resp, err = importer.importClient.DownloadSST(dctx, peer.GetStoreId(), req)
cancel()
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
continue
}
return errors.Trace(err)
}

break
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -832,10 +867,13 @@ func (importer *FileImporter) downloadRawKVSST(
}

func (importer *FileImporter) ingest(
ctx context.Context,
c context.Context,
files []*backuppb.File,
info *split.RegionInfo,
downloadMetas []*import_sstpb.SSTMeta,
) error {
ctx, cancel := context.WithTimeout(c, gRPCTimeOut)
defer cancel()
for {
ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info)
if errIngest != nil {
Expand Down Expand Up @@ -866,7 +904,10 @@ func (importer *FileImporter) ingest(
break
}
// do not get region info, wait a second and GetRegion() again.
log.Warn("get region by key return nil", logutil.Region(info.Region))
log.Warn("ingest get region by key return nil", logutil.Region(info.Region),
logutil.Files(files),
logutil.SSTMetas(downloadMetas),
)
time.Sleep(time.Second)
}
}
Expand All @@ -875,6 +916,8 @@ func (importer *FileImporter) ingest(
return errors.Trace(berrors.ErrKVEpochNotMatch)
}
log.Debug("ingest sst returns not leader error, retry it",
logutil.Files(files),
logutil.SSTMetas(downloadMetas),
logutil.Region(info.Region),
zap.Stringer("newLeader", newInfo.Leader))
info = newInfo
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (r *RPCResult) StrategyForRetryGoError() RetryStrategy {
// we should unwrap the error or we cannot get the write gRPC status.
if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok {
switch gRPCErr.Code() {
case codes.Unavailable, codes.Aborted, codes.ResourceExhausted:
case codes.Unavailable, codes.Aborted, codes.ResourceExhausted, codes.DeadlineExceeded:
return StrategyFromThisRegion
}
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/restore/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,15 @@ func MergeFileRanges(
rangeTree := rtree.NewRangeTree()
for key := range filesMap {
files := filesMap[key]
rangeSize := uint64(0)
for _, f := range filesMap[key] {
rangeSize += f.Size_
}
if out := rangeTree.InsertRange(rtree.Range{
StartKey: files[0].GetStartKey(),
EndKey: files[0].GetEndKey(),
Files: files,
Size: rangeSize,
}); out != nil {
return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange,
"duplicate range %s files %+v", out, files)
Expand All @@ -136,6 +141,7 @@ func MergeFileRanges(
continue
}
sortedRanges[i-1].EndKey = sortedRanges[i].EndKey
sortedRanges[i-1].Size += sortedRanges[i].Size
sortedRanges[i-1].Files = append(sortedRanges[i-1].Files, sortedRanges[i].Files...)
// TODO: this is slow when there are lots of ranges need to merge.
sortedRanges = append(sortedRanges[:i], sortedRanges[i+1:]...)
Expand Down
30 changes: 29 additions & 1 deletion br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func NewTiKVSender(
cli TiKVRestorer,
updateCh glue.Progress,
splitConcurrency uint,
granularity string,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)
Expand All @@ -257,7 +258,13 @@ func NewTiKVSender(

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.restoreWorker(ctx, midCh)
if granularity == string(CoarseGrained) {
outCh := make(chan drainResultAndDone, defaultChannelSize)
go sender.blockPipelineWorker(ctx, midCh, outCh)
go sender.restoreWorker(ctx, outCh)
} else {
go sender.restoreWorker(ctx, midCh)
}
return sender, nil
}

Expand All @@ -272,6 +279,26 @@ type drainResultAndDone struct {
done func()
}

func (b *tikvSender) blockPipelineWorker(ctx context.Context,
inCh <-chan drainResultAndDone,
outCh chan<- drainResultAndDone,
) {
defer close(outCh)
res := make([]drainResultAndDone, 0, defaultChannelSize)
for dr := range inCh {
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
res = append(res, dr)
}

for _, dr := range res {
select {
case <-ctx.Done():
return
default:
outCh <- dr
}
}
}

func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
Expand Down Expand Up @@ -385,6 +412,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul
if !ok {
return
}

files := r.result.Files()
// There has been a worker in the `RestoreSSTFiles` procedure.
// Spawning a raw goroutine won't make too many requests to TiKV.
Expand Down
Loading