Skip to content

Commit

Permalink
apply patch
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Jan 3, 2024
1 parent 1f49f2d commit 9859de1
Show file tree
Hide file tree
Showing 15 changed files with 706 additions and 279 deletions.
10 changes: 6 additions & 4 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ var (
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
ErrPDInvalidResponse = errors.Normalize("PD invalid response", errors.RFCCodeText("BR:PD:ErrPDInvalidResponse"))
ErrPDBatchScanRegion = errors.Normalize("batch scan region", errors.RFCCodeText("BR:PD:ErrPDBatchScanRegion"))
ErrPDUnknownScatterResult = errors.Normalize("failed to wait region scattered", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))
ErrPDSplitFailed = errors.Normalize("failed to wait region splitted", errors.RFCCodeText("BR:PD:ErrPDUknownScatterResult"))

ErrBackupChecksumMismatch = errors.Normalize("backup checksum mismatch", errors.RFCCodeText("BR:Backup:ErrBackupChecksumMismatch"))
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
Expand Down
60 changes: 47 additions & 13 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/checkpoint"
"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
Expand Down Expand Up @@ -119,10 +120,12 @@ type Client struct {
dbPool []*DB
rateLimit uint64
isOnline bool
granularity string
noSchema bool
hasSpeedLimited bool

restoreStores []uint64
storeCount int

cipher *backuppb.CipherInfo
switchModeInterval time.Duration
Expand Down Expand Up @@ -457,6 +460,10 @@ func (rc *Client) GetDomain() *domain.Domain {
return rc.dom
}

func (rc *Client) GetStoreCount() int {
return rc.storeCount
}

// GetPDClient returns a pd client.
func (rc *Client) GetPDClient() pd.Client {
return rc.pdClient
Expand Down Expand Up @@ -523,10 +530,25 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
return nil
}

func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) {
func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) {
storeWorkerPoolMap := make(map[uint64]chan struct{})
storeStatisticMap := make(map[uint64]*int64)
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash)
if err != nil {
log.Fatal("failed to get stores", zap.Error(err))
}
concurrencyPerStore := 512
for _, store := range stores {
ch := make(chan struct{}, concurrencyPerStore)
for i := 0; i < concurrencyPerStore; i += 1 {
ch <- struct{}{}
}
storeWorkerPoolMap[store.Id] = ch
storeStatisticMap[store.Id] = new(int64)
}
metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, rc.rewriteMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, storeStatisticMap, rc.rewriteMode)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -566,7 +588,7 @@ func (rc *Client) InitBackupMeta(
}
rc.backupMeta = backupMeta

rc.InitClients(backend, backupMeta.IsRawKv, backupMeta.IsTxnKv)
rc.InitClients(c, backend, backupMeta.IsRawKv, backupMeta.IsTxnKv)
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
}
Expand Down Expand Up @@ -643,6 +665,16 @@ func (rc *Client) EnableOnline() {
rc.isOnline = true
}

// SetGranularity sets the ganularity of restore pipeline.
func (rc *Client) SetGranularity(g string) {
rc.granularity = g
}

// GetGranularity sets the ganularity of restore pipeline.
func (rc *Client) GetGranularity() string {
return rc.granularity
}

// GetTLSConfig returns the tls config.
func (rc *Client) GetTLSConfig() *tls.Config {
return rc.tlsConf
Expand Down Expand Up @@ -1224,7 +1256,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, rc.rewriteMode)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, nil, rc.rewriteMode)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -2005,31 +2037,33 @@ const (

// LoadRestoreStores loads the stores used to restore data.
func (rc *Client) LoadRestoreStores(ctx context.Context) error {
if !rc.isOnline {
return nil
}
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.LoadRestoreStores", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash)

stores, err := rc.pdClient.GetAllStores(ctx)
if err != nil {
return errors.Trace(err)
}
for _, s := range stores {
if s.GetState() != metapb.StoreState_Up {
continue
}
for _, l := range s.GetLabels() {
if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue {
rc.restoreStores = append(rc.restoreStores, s.GetId())
break
rc.storeCount++
if rc.isOnline {
for _, l := range s.GetLabels() {
if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue {
rc.restoreStores = append(rc.restoreStores, s.GetId())
break
}
}
}
}
log.Info("load restore stores", zap.Uint64s("store-ids", rc.restoreStores))
if rc.isOnline {
log.Info("load restore stores", zap.Uint64s("store-ids", rc.restoreStores))
}
return nil
}

Expand Down
67 changes: 55 additions & 12 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ const (
)

const (
importScanRegionTime = 10 * time.Second
importScanRegionTime = 20 * time.Second
gRPCBackOffMaxDelay = 3 * time.Second
gRPCTimeOut = 10 * time.Minute
)

// RewriteMode is a mode flag that tells the TiKV how to handle the rewrite rules.
Expand Down Expand Up @@ -267,6 +268,10 @@ type FileImporter struct {
importClient ImporterClient
backend *backuppb.StorageBackend

storeWorkerPoolRWLock sync.RWMutex
storeWorkerPoolMap map[uint64]chan struct{}
storeStatisticMap map[uint64]*int64

kvMode KvMode
rawStartKey []byte
rawEndKey []byte
Expand All @@ -283,6 +288,8 @@ func NewFileImporter(
backend *backuppb.StorageBackend,
isRawKvMode bool,
isTxnKvMode bool,
storeWorkerPoolMap map[uint64]chan struct{},
storeStatisticMap map[uint64]*int64,
rewriteMode RewriteMode,
) FileImporter {
kvMode := TiDB
Expand All @@ -293,12 +300,14 @@ func NewFileImporter(
kvMode = Txn
}
return FileImporter{
metaClient: metaClient,
backend: backend,
importClient: importClient,
kvMode: kvMode,
rewriteMode: rewriteMode,
cacheKey: fmt.Sprintf("BR-%s-%d", time.Now().Format("20060102150405"), rand.Int63()),
metaClient: metaClient,
backend: backend,
importClient: importClient,
storeStatisticMap: storeStatisticMap,
storeWorkerPoolMap: storeWorkerPoolMap,
kvMode: kvMode,
rewriteMode: rewriteMode,
cacheKey: fmt.Sprintf("BR-%s-%d", time.Now().Format("20060102150405"), rand.Int63()),
}
}

Expand Down Expand Up @@ -569,17 +578,18 @@ func (importer *FileImporter) ImportSSTFiles(
log.Debug("download file done",
zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)),
logutil.Key("start", files[0].StartKey), logutil.Key("end", files[0].EndKey))
if errIngest := importer.ingest(ctx, info, downloadMetas); errIngest != nil {
start = time.Now()
if errIngest := importer.ingest(ctx, files, info, downloadMetas); errIngest != nil {
log.Warn("ingest file failed, retry later",
logutil.Files(files),
logutil.SSTMetas(downloadMetas),
logutil.Region(info.Region),
zap.Error(errIngest))
return errors.Trace(errIngest)
}
log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)))
}

log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)))
for _, f := range files {
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
Expand Down Expand Up @@ -719,7 +729,32 @@ func (importer *FileImporter) downloadSST(
for _, p := range regionInfo.Region.GetPeers() {
peer := p
eg.Go(func() error {
resp, err := importer.importClient.DownloadSST(ectx, peer.GetStoreId(), req)
importer.storeWorkerPoolRWLock.RLock()
workerCh := importer.storeWorkerPoolMap[peer.GetStoreId()]
statis := importer.storeStatisticMap[peer.GetStoreId()]
defer func() {
atomic.AddInt64(statis, -1)
workerCh <- struct{}{}
importer.storeWorkerPoolRWLock.RUnlock()
}()
<-workerCh
atomic.AddInt64(statis, 1)

var err error
var resp *import_sstpb.DownloadResponse
for i := 0; i < 5; i += 1 {
dctx, cancel := context.WithTimeout(ectx, gRPCTimeOut)
resp, err = importer.importClient.DownloadSST(dctx, peer.GetStoreId(), req)
cancel()
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
continue
}
return errors.Trace(err)
}

break
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -832,10 +867,13 @@ func (importer *FileImporter) downloadRawKVSST(
}

func (importer *FileImporter) ingest(
ctx context.Context,
c context.Context,
files []*backuppb.File,
info *split.RegionInfo,
downloadMetas []*import_sstpb.SSTMeta,
) error {
ctx, cancel := context.WithTimeout(c, gRPCTimeOut)
defer cancel()
for {
ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info)
if errIngest != nil {
Expand Down Expand Up @@ -866,7 +904,10 @@ func (importer *FileImporter) ingest(
break
}
// do not get region info, wait a second and GetRegion() again.
log.Warn("get region by key return nil", logutil.Region(info.Region))
log.Warn("ingest get region by key return nil", logutil.Region(info.Region),
logutil.Files(files),
logutil.SSTMetas(downloadMetas),
)
time.Sleep(time.Second)
}
}
Expand All @@ -875,6 +916,8 @@ func (importer *FileImporter) ingest(
return errors.Trace(berrors.ErrKVEpochNotMatch)
}
log.Debug("ingest sst returns not leader error, retry it",
logutil.Files(files),
logutil.SSTMetas(downloadMetas),
logutil.Region(info.Region),
zap.Stringer("newLeader", newInfo.Leader))
info = newInfo
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (r *RPCResult) StrategyForRetryGoError() RetryStrategy {
// we should unwrap the error or we cannot get the write gRPC status.
if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok {
switch gRPCErr.Code() {
case codes.Unavailable, codes.Aborted, codes.ResourceExhausted:
case codes.Unavailable, codes.Aborted, codes.ResourceExhausted, codes.DeadlineExceeded:
return StrategyFromThisRegion
}
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/restore/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,15 @@ func MergeFileRanges(
rangeTree := rtree.NewRangeTree()
for key := range filesMap {
files := filesMap[key]
rangeSize := uint64(0)
for _, f := range filesMap[key] {
rangeSize += f.Size_
}
if out := rangeTree.InsertRange(rtree.Range{
StartKey: files[0].GetStartKey(),
EndKey: files[0].GetEndKey(),
Files: files,
Size: rangeSize,
}); out != nil {
return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange,
"duplicate range %s files %+v", out, files)
Expand All @@ -136,6 +141,7 @@ func MergeFileRanges(
continue
}
sortedRanges[i-1].EndKey = sortedRanges[i].EndKey
sortedRanges[i-1].Size += sortedRanges[i].Size
sortedRanges[i-1].Files = append(sortedRanges[i-1].Files, sortedRanges[i].Files...)
// TODO: this is slow when there are lots of ranges need to merge.
sortedRanges = append(sortedRanges[:i], sortedRanges[i+1:]...)
Expand Down
30 changes: 29 additions & 1 deletion br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func NewTiKVSender(
cli TiKVRestorer,
updateCh glue.Progress,
splitConcurrency uint,
granularity string,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)
Expand All @@ -257,7 +258,13 @@ func NewTiKVSender(

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.restoreWorker(ctx, midCh)
if granularity == string(CoarseGrained) {
outCh := make(chan drainResultAndDone, defaultChannelSize)
go sender.blockPipelineWorker(ctx, midCh, outCh)
go sender.restoreWorker(ctx, outCh)
} else {
go sender.restoreWorker(ctx, midCh)
}
return sender, nil
}

Expand All @@ -272,6 +279,26 @@ type drainResultAndDone struct {
done func()
}

func (b *tikvSender) blockPipelineWorker(ctx context.Context,
inCh <-chan drainResultAndDone,
outCh chan<- drainResultAndDone,
) {
defer close(outCh)
res := make([]drainResultAndDone, 0, defaultChannelSize)
for dr := range inCh {
res = append(res, dr)
}

for _, dr := range res {
select {
case <-ctx.Done():
return
default:
outCh <- dr
}
}
}

func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
Expand Down Expand Up @@ -385,6 +412,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul
if !ok {
return
}

files := r.result.Files()
// There has been a worker in the `RestoreSSTFiles` procedure.
// Spawning a raw goroutine won't make too many requests to TiKV.
Expand Down
Loading

0 comments on commit 9859de1

Please sign in to comment.