From f3348573c3892484240b414b45552690e9dd4437 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 15 May 2023 14:22:32 +0800 Subject: [PATCH 1/2] lightning: new regionJob from needRescan has zero retry counter Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 3fdd07fca7180..a2a46f5c4fac8 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1277,7 +1277,6 @@ func (local *Backend) startWorker( // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. jobWg.Add(len(jobs) - 1) for _, j := range jobs { - j.retryCount = job.retryCount jobOutCh <- j } } From 5c5af07dafd0cb764b3541e5c08fe1db3aa937a9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 16 May 2023 11:25:31 +0800 Subject: [PATCH 2/2] add test Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local.go | 3 +- br/pkg/lightning/backend/local/local_test.go | 66 ++++++++++++++++++++ br/pkg/restore/split_test.go | 8 +-- 3 files changed, 72 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index a2a46f5c4fac8..6e9e69711af19 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1163,7 +1163,7 @@ func (local *Backend) generateAndSendJob( return eg.Wait() } -// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey). +// fakeRegionJobs is used in test, the injected job can be found by (startKey, endKey). var fakeRegionJobs map[[2]string]struct { jobs []*regionJob err error @@ -1277,6 +1277,7 @@ func (local *Backend) startWorker( // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. jobWg.Add(len(jobs) - 1) for _, j := range jobs { + j.lastRetryableErr = job.lastRetryableErr jobOutCh <- j } } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 538510b9e440c..74945c2aad3c2 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1835,3 +1835,69 @@ func TestDoImport(t *testing.T) { } } } + +func TestRegionJobResetRetryCounter(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs") + }) + + // test that job need rescan when ingest + + initRanges := []Range{ + {start: []byte{'c'}, end: []byte{'d'}}, + } + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + engine: &Engine{}, + injected: getNeedRescanWhenIngestBehaviour(), + retryCount: maxWriteAndIngestRetryTimes, + }, + { + keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + retryCount: maxWriteAndIngestRetryTimes, + }, + }, + }, + {"c", "c2"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 2, + }, + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.NoError(t, err) + for _, v := range fakeRegionJobs { + for _, job := range v.jobs { + require.Len(t, job.injected, 0) + } + } +} diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 374897f6cf5c6..788b9f86669ef 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -776,7 +776,7 @@ func TestSplitPoint(t *testing.T) { splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: split.Value{Size: 100, Number: 100}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: split.Value{Size: 200, Number: 200}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}}) - client := NewFakeSplitClient() + client := newFakeSplitClient() client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f")) client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h")) client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) @@ -828,7 +828,7 @@ func TestSplitPoint2(t *testing.T) { splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: split.Value{Size: 200, Number: 200}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: split.Value{Size: 200, Number: 200}}) - client := NewFakeSplitClient() + client := newFakeSplitClient() client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g")) client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0))) for i := 0; i < 256; i++ { @@ -879,7 +879,7 @@ type fakeSplitClient struct { regions []*split.RegionInfo } -func NewFakeSplitClient() *fakeSplitClient { +func newFakeSplitClient() *fakeSplitClient { return &fakeSplitClient{ regions: make([]*split.RegionInfo, 0), } @@ -1012,7 +1012,7 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { } mockIter := &mockLogIter{} ctx := context.Background() - logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient(), 144*1024*1024, 1440000) + logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, newFakeSplitClient(), 144*1024*1024, 1440000) next := 0 for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) { require.NoError(t, r.Err)