Skip to content

Commit

Permalink
br: Fix stuck when meeting error (#33201) (#33269)
Browse files Browse the repository at this point in the history
close #33200
  • Loading branch information
ti-srebot authored Mar 25, 2022
1 parent 9277dc9 commit eacba78
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 9 deletions.
10 changes: 10 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -807,6 +808,15 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F
return files[:idx], files[idx:]
}

// SplitRanges implements TiKVRestorer.
func (rc *Client) SplitRanges(ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool) error {
return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv)
}

// RestoreFiles tries to restore the files.
func (rc *Client) RestoreFiles(
ctx context.Context,
Expand Down
40 changes: 31 additions & 9 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"time"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/summary"
Expand Down Expand Up @@ -183,8 +185,27 @@ type BatchSender interface {
Close()
}

// TiKVRestorer is the minimal methods required for restoring.
// It contains the primitive APIs extract from `restore.Client`, so some of arguments may seem redundant.
// Maybe TODO: make a better abstraction?
type TiKVRestorer interface {
// SplitRanges split regions implicated by the ranges and rewrite rules.
// After spliting, it also scatters the fresh regions.
SplitRanges(ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool) error
// RestoreFiles import the files to the TiKV.
RestoreFiles(ctx context.Context,
files []*backuppb.File,
rewriteRules *RewriteRules,
updateCh glue.Progress) error
}

type tikvSender struct {
client *Client
client TiKVRestorer

updateCh glue.Progress

sink TableSink
Expand All @@ -209,7 +230,7 @@ func (b *tikvSender) RestoreBatch(ranges DrainResult) {
// NewTiKVSender make a sender that send restore requests to TiKV.
func NewTiKVSender(
ctx context.Context,
cli *Client,
cli TiKVRestorer,
updateCh glue.Progress,
splitConcurrency uint,
) (BatchSender, error) {
Expand Down Expand Up @@ -252,9 +273,9 @@ func (b *tikvSender) splitWorker(ctx context.Context,
b.wg.Done()
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
close(next)
log.Info("TiKV Sender: split worker exits.")
}()

start := time.Now()
Expand All @@ -266,7 +287,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
pool := utils.NewWorkerPool(concurrency, "split")
for {
select {
case <-ctx.Done():
case <-ectx.Done():
return
case result, ok := <-ranges:
if !ok {
Expand All @@ -289,7 +310,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh, false)
err := b.client.SplitRanges(ectx, result.Ranges, result.RewriteRules, b.updateCh, false)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
Expand Down Expand Up @@ -338,17 +359,17 @@ func (b *tikvSender) waitTablesDone(ts []CreatedTable) {
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Debug("restore worker closed")
log.Info("TiKV Sender: restore worker prepare to close.")
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
b.wg.Done()
b.sink.Close()
b.wg.Done()
log.Info("TiKV Sender: restore worker exits.")
}()
for {
select {
case <-ctx.Done():
case <-ectx.Done():
return
case r, ok := <-ranges:
if !ok {
Expand All @@ -360,6 +381,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
log.Error("restore batch meet error", logutil.ShortError(e), logutil.Files(files))
r.done()
return e
}
Expand Down
125 changes: 125 additions & 0 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import (
"time"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -560,3 +565,123 @@ func TestRegionConsistency(t *testing.T) {
require.Regexp(t, ca.err, err.Error())
}
}

type fakeRestorer struct {
errorInSplit bool
splitRanges []rtree.Range
restoredFiles []*backuppb.File
}

func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *restore.RewriteRules, updateCh glue.Progress, isRawKv bool) error {
if ctx.Err() != nil {
return ctx.Err()
}
f.splitRanges = append(f.splitRanges, ranges...)
if f.errorInSplit {
err := errors.Annotatef(berrors.ErrRestoreSplitFailed,
"the key space takes many efforts and finally get together, how dare you split them again... :<")
log.Error("error happens :3", logutil.ShortError(err))
return err
}
return nil
}

func (f *fakeRestorer) RestoreFiles(ctx context.Context, files []*backuppb.File, rewriteRules *restore.RewriteRules, updateCh glue.Progress) error {
if ctx.Err() != nil {
return ctx.Err()
}
f.restoredFiles = append(f.restoredFiles, files...)
err := errors.Annotatef(berrors.ErrRestoreWriteAndIngest, "the files to restore are taken by a hijacker, meow :3")
log.Error("error happens :3", logutil.ShortError(err))
return err
}

func fakeRanges(keys ...string) (r restore.DrainResult) {
for i := range keys {
if i+1 == len(keys) {
return
}
r.Ranges = append(r.Ranges, rtree.Range{
StartKey: []byte(keys[i]),
EndKey: []byte(keys[i+1]),
Files: []*backuppb.File{{Name: "fake.sst"}},
})
}
return
}

type errorInTimeSink struct {
ctx context.Context
errCh chan error
t *testing.T
}

func (e errorInTimeSink) EmitTables(tables ...restore.CreatedTable) {}

func (e errorInTimeSink) EmitError(err error) {
e.errCh <- err
}

func (e errorInTimeSink) Close() {}

func (e errorInTimeSink) Wait() {
select {
case <-e.ctx.Done():
e.t.Logf("The context is canceled but no error happen")
e.t.FailNow()
case <-e.errCh:
}
}

func assertErrorEmitInTime(ctx context.Context, t *testing.T) errorInTimeSink {
errCh := make(chan error, 1)
return errorInTimeSink{
ctx: ctx,
errCh: errCh,
t: t,
}
}

func TestRestoreFailed(t *testing.T) {
ranges := []restore.DrainResult{
fakeRanges("aax", "abx", "abz"),
fakeRanges("abz", "bbz", "bcy"),
fakeRanges("bcy", "cad", "xxy"),
}
r := &fakeRestorer{}
sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1)
require.NoError(t, err)
dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sink := assertErrorEmitInTime(dctx, t)
sender.PutSink(sink)
for _, r := range ranges {
sender.RestoreBatch(r)
}
sink.Wait()
sink.Close()
sender.Close()
require.GreaterOrEqual(t, len(r.restoredFiles), 1)
}

func TestSplitFailed(t *testing.T) {
ranges := []restore.DrainResult{
fakeRanges("aax", "abx", "abz"),
fakeRanges("abz", "bbz", "bcy"),
fakeRanges("bcy", "cad", "xxy"),
}
r := &fakeRestorer{errorInSplit: true}
sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1)
require.NoError(t, err)
dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sink := assertErrorEmitInTime(dctx, t)
sender.PutSink(sink)
for _, r := range ranges {
sender.RestoreBatch(r)
}
sink.Wait()
sender.Close()
require.GreaterOrEqual(t, len(r.splitRanges), 2)
require.Len(t, r.restoredFiles, 0)
}

0 comments on commit eacba78

Please sign in to comment.