Skip to content

Commit

Permalink
lightning: new regionJob from needRescan has zero retry counter (#43810
Browse files Browse the repository at this point in the history
…) (#43902)

close #43682
  • Loading branch information
ti-chi-bot committed May 17, 2023
1 parent 7238601 commit c4d8008
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 14 deletions.
11 changes: 4 additions & 7 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,7 @@ type BackendConfig struct {
MaxConnPerStore int
// compress type when write or ingest into tikv
ConnCompressType config.CompressionType
// concurrency of generateJobForRange.
RangeConcurrency int
// number of import(write & ingest) workers
// concurrency of generateJobForRange and import(write & ingest) workers
WorkerConcurrency int
KVWriteBatchSize int
CheckpointEnabled bool
Expand Down Expand Up @@ -429,7 +427,6 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string)
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
RangeConcurrency: cfg.TikvImporter.RangeConcurrency,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: cfg.TikvImporter.SendKVPairs,
CheckpointEnabled: cfg.Checkpoint.Enable,
Expand Down Expand Up @@ -1136,7 +1133,7 @@ func (local *Backend) generateAndSendJob(
logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges)))

eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(local.RangeConcurrency)
eg.SetLimit(local.WorkerConcurrency)
for _, jobRange := range jobRanges {
r := jobRange
eg.Go(func() error {
Expand Down Expand Up @@ -1169,7 +1166,7 @@ func (local *Backend) generateAndSendJob(
return eg.Wait()
}

// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey).
// fakeRegionJobs is used in test, the injected job can be found by (startKey, endKey).
var fakeRegionJobs map[[2]string]struct {
jobs []*regionJob
err error
Expand Down Expand Up @@ -1283,7 +1280,7 @@ func (local *Backend) startWorker(
// 1 "needRescan" job becomes len(jobs) "regionScanned" jobs.
jobWg.Add(len(jobs) - 1)
for _, j := range jobs {
j.retryCount = job.retryCount
j.lastRetryableErr = job.lastRetryableErr
jobOutCh <- j
}
}
Expand Down
69 changes: 67 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
panicSplitRegionClient{}, // make sure no further split region
),
}
local.BackendConfig.RangeConcurrency = 1
local.BackendConfig.WorkerConcurrency = 1
db, tmpPath := makePebbleDB(t, nil)
_, engineUUID := backend.MakeUUID("ww", 0)
ctx := context.Background()
Expand Down Expand Up @@ -1830,7 +1830,6 @@ func TestDoImport(t *testing.T) {
ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
RangeConcurrency: 1,
WorkerConcurrency: 2,
},
}
Expand Down Expand Up @@ -1963,3 +1962,69 @@ func TestDoImport(t *testing.T) {
}
}
}

func TestRegionJobResetRetryCounter(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs")
})

// test that job need rescan when ingest

initRanges := []Range{
{start: []byte{'c'}, end: []byte{'d'}},
}
fakeRegionJobs = map[[2]string]struct {
jobs []*regionJob
err error
}{
{"c", "d"}: {
jobs: []*regionJob{
{
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
engine: &Engine{},
injected: getNeedRescanWhenIngestBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
},
{
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
},
},
},
{"c", "c2"}: {
jobs: []*regionJob{
{
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
},
},
},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
},
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.NoError(t, err)
for _, v := range fakeRegionJobs {
for _, job := range v.jobs {
require.Len(t, job.injected, 0)
}
}
}
8 changes: 4 additions & 4 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func TestSplitPoint(t *testing.T) {
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: split.Value{Size: 100, Number: 100}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: split.Value{Size: 200, Number: 200}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}})
client := NewFakeSplitClient()
client := newFakeSplitClient()
client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f"))
client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h"))
client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j"))
Expand Down Expand Up @@ -828,7 +828,7 @@ func TestSplitPoint2(t *testing.T) {
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: split.Value{Size: 200, Number: 200}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: split.Value{Size: 200, Number: 200}})
client := NewFakeSplitClient()
client := newFakeSplitClient()
client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g"))
client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0)))
for i := 0; i < 256; i++ {
Expand Down Expand Up @@ -879,7 +879,7 @@ type fakeSplitClient struct {
regions []*split.RegionInfo
}

func NewFakeSplitClient() *fakeSplitClient {
func newFakeSplitClient() *fakeSplitClient {
return &fakeSplitClient{
regions: make([]*split.RegionInfo, 0),
}
Expand Down Expand Up @@ -1012,7 +1012,7 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) {
}
mockIter := &mockLogIter{}
ctx := context.Background()
logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient(), 144*1024*1024, 1440000)
logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, newFakeSplitClient(), 144*1024*1024, 1440000)
next := 0
for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) {
require.NoError(t, r.Err)
Expand Down
1 change: 0 additions & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm
LocalStoreDir: dir,
MaxConnPerStore: config.DefaultRangeConcurrency,
ConnCompressType: config.CompressionNone,
RangeConcurrency: config.DefaultRangeConcurrency,
WorkerConcurrency: config.DefaultRangeConcurrency * 2,
KVWriteBatchSize: config.KVWriteBatchSize,
// todo: local backend report error when the sort-dir already exists & checkpoint disabled.
Expand Down

0 comments on commit c4d8008

Please sign in to comment.