diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 7c592f6d2e1eb..084988995d184 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -104,7 +104,7 @@ go_test( embed = [":local"], flaky = True, race = "on", - shard_count = 48, + shard_count = 49, deps = [ "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/encode", diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 4b0849c6ee9fc..15171faca05d5 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -697,10 +697,15 @@ func newMockImportClient() *mockImportClient { } } -func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) { +func (c *mockImportClient) MultiIngest(_ context.Context, req *sst.MultiIngestRequest, _ ...grpc.CallOption) (*sst.IngestResponse, error) { defer func() { c.cnt++ }() + for _, meta := range req.Ssts { + if meta.RegionId != c.store.GetId() { + return &sst.IngestResponse{Error: &errorpb.Error{Message: "The file which would be ingested doest not exist."}}, nil + } + } if c.apiInvokeRecorder != nil { c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId()) } @@ -732,7 +737,7 @@ func (c *mockImportClient) Write(ctx context.Context, opts ...grpc.CallOption) ( c.apiInvokeRecorder["Write"] = append(c.apiInvokeRecorder["Write"], c.store.GetId()) } return mockWriteClient{writeResp: &sst.WriteResponse{Metas: []*sst.SSTMeta{ - {}, {}, {}, + {RegionId: c.store.GetId()}, }}}, nil } @@ -1230,6 +1235,118 @@ func TestCheckPeersBusy(t *testing.T) { require.Equal(t, []byte("b"), retryJob.keyRange.end) } +func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // test lightning using stale region info (1,2,3), now the region is (11,12,13) + apiInvokeRecorder := map[string][]uint64{} + notLeaderResp := &sst.IngestResponse{ + Error: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{Leader: &metapb.Peer{StoreId: 11}}, + }} + + local := &Backend{ + splitCli: initTestSplitClient3Replica([][]byte{{}, {'a'}, {}}, nil), + importClientFactory: &mockImportClientFactory{ + stores: []*metapb.Store{ + {Id: 1}, {Id: 2}, {Id: 3}, + {Id: 11}, {Id: 12}, {Id: 13}, + }, + createClientFn: func(store *metapb.Store) sst.ImportSSTClient { + importCli := newMockImportClient() + importCli.store = store + importCli.apiInvokeRecorder = apiInvokeRecorder + if store.Id == 1 { + importCli.retry = 1 + importCli.resp = notLeaderResp + } + return importCli + }, + }, + logger: log.L(), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: true, + BackendConfig: BackendConfig{ + ShouldCheckWriteStall: true, + }, + tikvCodec: keyspace.CodecV1, + } + + db, tmpPath := makePebbleDB(t, nil) + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel2 := context.WithCancel(context.Background()) + f := &Engine{ + db: db, + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel2, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + err := f.db.Set([]byte("a"), []byte("a"), nil) + require.NoError(t, err) + + jobCh := make(chan *regionJob, 10) + + staleJob := ®ionJob{ + keyRange: Range{start: []byte("a"), end: []byte("")}, + region: &split.RegionInfo{ + Region: &metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, {Id: 3, StoreId: 3}, + }, + StartKey: []byte("a"), + EndKey: []byte(""), + }, + Leader: &metapb.Peer{Id: 1, StoreId: 1}, + }, + stage: regionScanned, + engine: f, + } + var jobWg sync.WaitGroup + jobWg.Add(1) + jobCh <- staleJob + + var wg sync.WaitGroup + wg.Add(1) + jobOutCh := make(chan *regionJob) + go func() { + defer wg.Done() + for { + job := <-jobOutCh + if job.stage == ingested { + jobWg.Done() + return + } + jobCh <- job + } + }() + wg.Add(1) + go func() { + defer wg.Done() + err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg) + require.NoError(t, err) + }() + + jobWg.Wait() + cancel() + wg.Wait() + + // "ingest" to test peers busy of stale region: 1,2,3 + // then "write" to stale region: 1,2,3 + // then "ingest" to stale leader: 1 + // then meet NotLeader error, scanned new region (11,12,13) + // repeat above for 11,12,13 + require.Equal(t, []uint64{1, 2, 3, 11, 12, 13}, apiInvokeRecorder["Write"]) + // store 12 has a follower busy, so it will break the workflow for region (11, 12, 13) + require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"]) +} + // mockGetSizeProperties mocks that 50MB * 20 SST file. func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) { props := newSizeProperties() diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 54bf3e5a28229..797a93aa756bb 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -386,7 +386,6 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { // if any underlying logic has error, ingest will return an error to let caller // handle it. func (local *Backend) ingest(ctx context.Context, j *regionJob) error { - splitCli := local.splitCli if j.stage != wrote { return nil } @@ -418,7 +417,7 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) error { logutil.Region(j.region.Region), logutil.Leader(j.region.Leader)) continue } - canContinue, err := j.convertStageOnIngestError(ctx, resp, splitCli) + canContinue, err := j.convertStageOnIngestError(resp) if common.IsContextCanceledError(err) { return err } @@ -559,53 +558,21 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe // Return (true, nil) when the job can retry ingesting immediately. // Return (false, nil) when the job should be put back to queue. func (j *regionJob) convertStageOnIngestError( - ctx context.Context, resp *sst.IngestResponse, - splitCli split.SplitClient, ) (bool, error) { if resp.GetError() == nil { return true, nil } - getRegion := func() (*split.RegionInfo, error) { - for i := 0; ; i++ { - newRegion, err := splitCli.GetRegion(ctx, j.region.Region.GetStartKey()) - if err != nil { - return nil, errors.Trace(err) - } - if newRegion != nil { - return newRegion, nil - } - log.FromContext(ctx).Warn("get region by key return nil, will retry", - logutil.Region(j.region.Region), logutil.Leader(j.region.Leader), - zap.Int("retry", i)) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(time.Second): - } - } - } - var newRegion *split.RegionInfo - var err error switch errPb := resp.GetError(); { case errPb.NotLeader != nil: j.lastRetryableErr = common.ErrKVNotLeader.GenWithStack(errPb.GetMessage()) - if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil { - newRegion = &split.RegionInfo{ - Leader: newLeader, - Region: j.region.Region, - } - } else { - newRegion, err = getRegion() - if err != nil { - return false, errors.Trace(err) - } - } - j.region = newRegion - return true, nil + // meet a problem that the region leader+peer are all updated but the return + // error is only "NotLeader", we should update the whole region info. + j.convertStageTo(needRescan) + return false, nil case errPb.EpochNotMatch != nil: j.lastRetryableErr = common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage()) diff --git a/br/pkg/lightning/backend/local/region_job_test.go b/br/pkg/lightning/backend/local/region_job_test.go index db555ff20d0fc..fd508372ef9f9 100644 --- a/br/pkg/lightning/backend/local/region_job_test.go +++ b/br/pkg/lightning/backend/local/region_job_test.go @@ -28,7 +28,6 @@ import ( ) func TestIsIngestRetryable(t *testing.T) { - ctx := context.Background() region := &split.RegionInfo{ Leader: &metapb.Peer{Id: 1}, Region: &metapb.Region{ @@ -66,8 +65,6 @@ func TestIsIngestRetryable(t *testing.T) { sstMeta: metas, }, } - splitCli := &mockSplitClient{} - // NotLeader doesn't mean region peers are changed, so we can retry ingest. resp := &sst.IngestResponse{ @@ -79,11 +76,10 @@ func TestIsIngestRetryable(t *testing.T) { } clone := job - canContinueIngest, err := (&clone).convertStageOnIngestError(ctx, resp, splitCli) + canContinueIngest, err := (&clone).convertStageOnIngestError(resp) require.NoError(t, err) - require.True(t, canContinueIngest) - require.Equal(t, wrote, clone.stage) - require.Equal(t, uint64(2), clone.region.Leader.Id) + require.False(t, canContinueIngest) + require.Equal(t, needRescan, clone.stage) require.Error(t, clone.lastRetryableErr) // EpochNotMatch means region is changed, if the new region covers the old, we can restart the writing process. @@ -106,7 +102,7 @@ func TestIsIngestRetryable(t *testing.T) { }, } clone = job - canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(resp) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, regionScanned, clone.stage) @@ -131,7 +127,7 @@ func TestIsIngestRetryable(t *testing.T) { }, } clone = job - canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(resp) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, needRescan, clone.stage) @@ -141,7 +137,7 @@ func TestIsIngestRetryable(t *testing.T) { resp.Error = &errorpb.Error{Message: "raft: proposal dropped"} clone = job - canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(resp) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, needRescan, clone.stage) @@ -155,7 +151,7 @@ func TestIsIngestRetryable(t *testing.T) { }, } clone = job - canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(resp) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, needRescan, clone.stage) @@ -167,7 +163,7 @@ func TestIsIngestRetryable(t *testing.T) { DiskFull: &errorpb.DiskFull{}, } clone = job - _, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) + _, err = (&clone).convertStageOnIngestError(resp) require.ErrorContains(t, err, "non-retryable error") // a general error is retryable from writing @@ -176,7 +172,7 @@ func TestIsIngestRetryable(t *testing.T) { StaleCommand: &errorpb.StaleCommand{}, } clone = job - canContinueIngest, err = (&clone).convertStageOnIngestError(ctx, resp, splitCli) + canContinueIngest, err = (&clone).convertStageOnIngestError(resp) require.NoError(t, err) require.False(t, canContinueIngest) require.Equal(t, regionScanned, clone.stage)