diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 69c5a68fe5432..c9cbe8dcee01a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -393,9 +393,7 @@ type BackendConfig struct { MaxConnPerStore int // compress type when write or ingest into tikv ConnCompressType config.CompressionType - // concurrency of generateJobForRange. - RangeConcurrency int - // number of import(write & ingest) workers + // concurrency of generateJobForRange and import(write & ingest) workers WorkerConcurrency int KVWriteBatchSize int CheckpointEnabled bool @@ -429,7 +427,6 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string) LocalStoreDir: cfg.TikvImporter.SortedKVDir, MaxConnPerStore: cfg.TikvImporter.RangeConcurrency, ConnCompressType: cfg.TikvImporter.CompressKVPairs, - RangeConcurrency: cfg.TikvImporter.RangeConcurrency, WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2, KVWriteBatchSize: cfg.TikvImporter.SendKVPairs, CheckpointEnabled: cfg.Checkpoint.Enable, @@ -1136,7 +1133,7 @@ func (local *Backend) generateAndSendJob( logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) eg, egCtx := errgroup.WithContext(ctx) - eg.SetLimit(local.RangeConcurrency) + eg.SetLimit(local.WorkerConcurrency) for _, jobRange := range jobRanges { r := jobRange eg.Go(func() error { @@ -1169,7 +1166,7 @@ func (local *Backend) generateAndSendJob( return eg.Wait() } -// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey). +// fakeRegionJobs is used in test, the injected job can be found by (startKey, endKey). var fakeRegionJobs map[[2]string]struct { jobs []*regionJob err error @@ -1283,7 +1280,7 @@ func (local *Backend) startWorker( // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. jobWg.Add(len(jobs) - 1) for _, j := range jobs { - j.retryCount = job.retryCount + j.lastRetryableErr = job.lastRetryableErr jobOutCh <- j } } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 09e714b8368c5..ddd1c17ff2d34 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1635,7 +1635,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { panicSplitRegionClient{}, // make sure no further split region ), } - local.BackendConfig.RangeConcurrency = 1 + local.BackendConfig.WorkerConcurrency = 1 db, tmpPath := makePebbleDB(t, nil) _, engineUUID := backend.MakeUUID("ww", 0) ctx := context.Background() @@ -1830,7 +1830,6 @@ func TestDoImport(t *testing.T) { ctx := context.Background() l := &Backend{ BackendConfig: BackendConfig{ - RangeConcurrency: 1, WorkerConcurrency: 2, }, } @@ -1963,3 +1962,69 @@ func TestDoImport(t *testing.T) { } } } + +func TestRegionJobResetRetryCounter(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs") + }) + + // test that job need rescan when ingest + + initRanges := []Range{ + {start: []byte{'c'}, end: []byte{'d'}}, + } + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + engine: &Engine{}, + injected: getNeedRescanWhenIngestBehaviour(), + retryCount: maxWriteAndIngestRetryTimes, + }, + { + keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + retryCount: maxWriteAndIngestRetryTimes, + }, + }, + }, + {"c", "c2"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 2, + }, + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.NoError(t, err) + for _, v := range fakeRegionJobs { + for _, job := range v.jobs { + require.Len(t, job.injected, 0) + } + } +} diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 374897f6cf5c6..788b9f86669ef 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -776,7 +776,7 @@ func TestSplitPoint(t *testing.T) { splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: split.Value{Size: 100, Number: 100}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: split.Value{Size: 200, Number: 200}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}}) - client := NewFakeSplitClient() + client := newFakeSplitClient() client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f")) client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h")) client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) @@ -828,7 +828,7 @@ func TestSplitPoint2(t *testing.T) { splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: split.Value{Size: 200, Number: 200}}) splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: split.Value{Size: 200, Number: 200}}) - client := NewFakeSplitClient() + client := newFakeSplitClient() client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g")) client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0))) for i := 0; i < 256; i++ { @@ -879,7 +879,7 @@ type fakeSplitClient struct { regions []*split.RegionInfo } -func NewFakeSplitClient() *fakeSplitClient { +func newFakeSplitClient() *fakeSplitClient { return &fakeSplitClient{ regions: make([]*split.RegionInfo, 0), } @@ -1012,7 +1012,7 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { } mockIter := &mockLogIter{} ctx := context.Background() - logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient(), 144*1024*1024, 1440000) + logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, newFakeSplitClient(), 144*1024*1024, 1440000) next := 0 for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) { require.NoError(t, r.Err) diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 78aae4fb11e2a..fc566f08cc364 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -114,7 +114,6 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm LocalStoreDir: dir, MaxConnPerStore: config.DefaultRangeConcurrency, ConnCompressType: config.CompressionNone, - RangeConcurrency: config.DefaultRangeConcurrency, WorkerConcurrency: config.DefaultRangeConcurrency * 2, KVWriteBatchSize: config.KVWriteBatchSize, // todo: local backend report error when the sort-dir already exists & checkpoint disabled.