Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: always retry from needRescan when NotLeader #43079

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ go_test(
embed = [":local"],
flaky = True,
race = "on",
shard_count = 48,
shard_count = 49,
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
Expand Down
121 changes: 119 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,10 +697,15 @@ func newMockImportClient() *mockImportClient {
}
}

func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) {
func (c *mockImportClient) MultiIngest(_ context.Context, req *sst.MultiIngestRequest, _ ...grpc.CallOption) (*sst.IngestResponse, error) {
defer func() {
c.cnt++
}()
for _, meta := range req.Ssts {
if meta.RegionId != c.store.GetId() {
return &sst.IngestResponse{Error: &errorpb.Error{Message: "The file which would be ingested doest not exist."}}, nil
}
}
if c.apiInvokeRecorder != nil {
c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId())
}
Expand Down Expand Up @@ -732,7 +737,7 @@ func (c *mockImportClient) Write(ctx context.Context, opts ...grpc.CallOption) (
c.apiInvokeRecorder["Write"] = append(c.apiInvokeRecorder["Write"], c.store.GetId())
}
return mockWriteClient{writeResp: &sst.WriteResponse{Metas: []*sst.SSTMeta{
{}, {}, {},
{RegionId: c.store.GetId()},
}}}, nil
}

Expand Down Expand Up @@ -1230,6 +1235,118 @@ func TestCheckPeersBusy(t *testing.T) {
require.Equal(t, []byte("b"), retryJob.keyRange.end)
}

func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// test lightning using stale region info (1,2,3), now the region is (11,12,13)
apiInvokeRecorder := map[string][]uint64{}
notLeaderResp := &sst.IngestResponse{
Error: &errorpb.Error{
NotLeader: &errorpb.NotLeader{Leader: &metapb.Peer{StoreId: 11}},
}}

local := &Backend{
splitCli: initTestSplitClient3Replica([][]byte{{}, {'a'}, {}}, nil),
importClientFactory: &mockImportClientFactory{
stores: []*metapb.Store{
{Id: 1}, {Id: 2}, {Id: 3},
{Id: 11}, {Id: 12}, {Id: 13},
},
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
importCli := newMockImportClient()
importCli.store = store
importCli.apiInvokeRecorder = apiInvokeRecorder
if store.Id == 1 {
importCli.retry = 1
importCli.resp = notLeaderResp
}
return importCli
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
BackendConfig: BackendConfig{
ShouldCheckWriteStall: true,
},
tikvCodec: keyspace.CodecV1,
}

db, tmpPath := makePebbleDB(t, nil)
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)

staleJob := &regionJob{
keyRange: Range{start: []byte("a"), end: []byte("")},
region: &split.RegionInfo{
Region: &metapb.Region{
Id: 1,
Peers: []*metapb.Peer{
{Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, {Id: 3, StoreId: 3},
},
StartKey: []byte("a"),
EndKey: []byte(""),
},
Leader: &metapb.Peer{Id: 1, StoreId: 1},
},
stage: regionScanned,
engine: f,
}
var jobWg sync.WaitGroup
jobWg.Add(1)
jobCh <- staleJob

var wg sync.WaitGroup
wg.Add(1)
jobOutCh := make(chan *regionJob)
go func() {
defer wg.Done()
for {
job := <-jobOutCh
if job.stage == ingested {
jobWg.Done()
return
}
jobCh <- job
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
require.NoError(t, err)
}()

jobWg.Wait()
cancel()
wg.Wait()

// "ingest" to test peers busy of stale region: 1,2,3
// then "write" to stale region: 1,2,3
// then "ingest" to stale leader: 1
// then meet NotLeader error, scanned new region (11,12,13)
// repeat above for 11,12,13
require.Equal(t, []uint64{1, 2, 3, 11, 12, 13}, apiInvokeRecorder["Write"])
// store 12 has a follower busy, so it will break the workflow for region (11, 12, 13)
require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"])
}

// mockGetSizeProperties mocks that 50MB * 20 SST file.
func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) {
props := newSizeProperties()
Expand Down
43 changes: 5 additions & 38 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
// if any underlying logic has error, ingest will return an error to let caller
// handle it.
func (local *Backend) ingest(ctx context.Context, j *regionJob) error {
splitCli := local.splitCli
if j.stage != wrote {
return nil
}
Expand Down Expand Up @@ -418,7 +417,7 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) error {
logutil.Region(j.region.Region), logutil.Leader(j.region.Leader))
continue
}
canContinue, err := j.convertStageOnIngestError(ctx, resp, splitCli)
canContinue, err := j.convertStageOnIngestError(resp)
if common.IsContextCanceledError(err) {
return err
}
Expand Down Expand Up @@ -559,53 +558,21 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe
// Return (true, nil) when the job can retry ingesting immediately.
// Return (false, nil) when the job should be put back to queue.
func (j *regionJob) convertStageOnIngestError(
ctx context.Context,
resp *sst.IngestResponse,
splitCli split.SplitClient,
) (bool, error) {
if resp.GetError() == nil {
return true, nil
}

getRegion := func() (*split.RegionInfo, error) {
for i := 0; ; i++ {
newRegion, err := splitCli.GetRegion(ctx, j.region.Region.GetStartKey())
if err != nil {
return nil, errors.Trace(err)
}
if newRegion != nil {
return newRegion, nil
}
log.FromContext(ctx).Warn("get region by key return nil, will retry",
logutil.Region(j.region.Region), logutil.Leader(j.region.Leader),
zap.Int("retry", i))
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Second):
}
}
}

var newRegion *split.RegionInfo
var err error
switch errPb := resp.GetError(); {
case errPb.NotLeader != nil:
j.lastRetryableErr = common.ErrKVNotLeader.GenWithStack(errPb.GetMessage())

if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil {
newRegion = &split.RegionInfo{
Leader: newLeader,
Region: j.region.Region,
}
} else {
newRegion, err = getRegion()
if err != nil {
return false, errors.Trace(err)
}
}
j.region = newRegion
return true, nil
// meet a problem that the region leader+peer are all updated but the return
// error is only "NotLeader", we should update the whole region info.
j.convertStageTo(needRescan)
return false, nil
case errPb.EpochNotMatch != nil:
j.lastRetryableErr = common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())

Expand Down
22 changes: 9 additions & 13 deletions br/pkg/lightning/backend/local/region_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
)

func TestIsIngestRetryable(t *testing.T) {
ctx := context.Background()
region := &split.RegionInfo{
Leader: &metapb.Peer{Id: 1},
Region: &metapb.Region{
Expand Down Expand Up @@ -66,8 +65,6 @@ func TestIsIngestRetryable(t *testing.T) {
sstMeta: metas,
},
}
splitCli := &mockSplitClient{}

// NotLeader doesn't mean region peers are changed, so we can retry ingest.

resp := &sst.IngestResponse{
Expand All @@ -79,11 +76,10 @@ func TestIsIngestRetryable(t *testing.T) {
}

clone := job
canContinueIngest, err := (&clone).convertStageOnIngestError(ctx, resp, splitCli)
canContinueIngest, err := (&clone).convertStageOnIngestError(resp)
require.NoError(t, err)
require.True(t, canContinueIngest)
require.Equal(t, wrote, clone.stage)
require.Equal(t, uint64(2), clone.region.Leader.Id)
require.False(t, canContinueIngest)
require.Equal(t, needRescan, clone.stage)
require.Error(t, clone.lastRetryableErr)

// EpochNotMatch means region is changed, if the new region covers the old, we can restart the writing process.
Expand All @@ -106,7 +102,7 @@ func TestIsIngestRetryable(t *testing.T) {
},
}
clone = job
canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli)
canContinueIngest, err = (&clone).convertStageOnIngestError(resp)
require.NoError(t, err)
require.False(t, canContinueIngest)
require.Equal(t, regionScanned, clone.stage)
Expand All @@ -131,7 +127,7 @@ func TestIsIngestRetryable(t *testing.T) {
},
}
clone = job
canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli)
canContinueIngest, err = (&clone).convertStageOnIngestError(resp)
require.NoError(t, err)
require.False(t, canContinueIngest)
require.Equal(t, needRescan, clone.stage)
Expand All @@ -141,7 +137,7 @@ func TestIsIngestRetryable(t *testing.T) {

resp.Error = &errorpb.Error{Message: "raft: proposal dropped"}
clone = job
canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli)
canContinueIngest, err = (&clone).convertStageOnIngestError(resp)
require.NoError(t, err)
require.False(t, canContinueIngest)
require.Equal(t, needRescan, clone.stage)
Expand All @@ -155,7 +151,7 @@ func TestIsIngestRetryable(t *testing.T) {
},
}
clone = job
canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli)
canContinueIngest, err = (&clone).convertStageOnIngestError(resp)
require.NoError(t, err)
require.False(t, canContinueIngest)
require.Equal(t, needRescan, clone.stage)
Expand All @@ -167,7 +163,7 @@ func TestIsIngestRetryable(t *testing.T) {
DiskFull: &errorpb.DiskFull{},
}
clone = job
_, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli)
_, err = (&clone).convertStageOnIngestError(resp)
require.ErrorContains(t, err, "non-retryable error")

// a general error is retryable from writing
Expand All @@ -176,7 +172,7 @@ func TestIsIngestRetryable(t *testing.T) {
StaleCommand: &errorpb.StaleCommand{},
}
clone = job
canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli)
canContinueIngest, err = (&clone).convertStageOnIngestError(resp)
require.NoError(t, err)
require.False(t, canContinueIngest)
require.Equal(t, regionScanned, clone.stage)
Expand Down