diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 399939fcd8e15..7dc5b083ab6ce 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -952,6 +952,9 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by LowerBound: lowerBound, UpperBound: upperBound, } + failpoint.Inject("mockGetFirstAndLastKey", func() { + failpoint.Return(lowerBound, upperBound, nil) + }) iter := e.newKVIter(context.Background(), opt) //nolint: errcheck diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 1c6eb1e7cb8d6..befb52e42e46a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1121,10 +1121,63 @@ func (local *Backend) generateAndSendJob( ) error { logger := log.FromContext(ctx) +<<<<<<< HEAD // when use dynamic region feature, the region may be very big, we need // to split to smaller ranges to increase the concurrency. if regionSplitSize > 2*int64(config.SplitRegionSize) { sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) +======= + logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) + + eg, egCtx := errgroup.WithContext(ctx) + + dataAndRangeCh := make(chan common.DataAndRange) + for i := 0; i < local.WorkerConcurrency; i++ { + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return nil + case p, ok := <-dataAndRangeCh: + if !ok { + return nil + } + + failpoint.Inject("beforeGenerateJob", nil) + failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { + // this is used to trigger worker failure, used together + // with WriteToTiKVNotEnoughDiskSpace + jobToWorkerCh <- ®ionJob{} + time.Sleep(5 * time.Second) + }) + jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys) + if err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return err + } + for _, job := range jobs { + job.ref(jobWg) + select { + case <-egCtx.Done(): + // this job is not put into jobToWorkerCh + job.done(jobWg) + // if the context is canceled, it means worker has error, the first error can be + // found by worker's error group LATER. if this function returns an error it will + // seize the "first error". + return nil + case jobToWorkerCh <- job: + } + } + } + } + }) + } + + eg.Go(func() error { + err := engine.LoadIngestData(egCtx, jobRanges, dataAndRangeCh) +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) if err != nil { return errors.Trace(err) } @@ -1548,6 +1601,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges }) } +<<<<<<< HEAD err := local.prepareAndSendJob( workerCtx, engine, @@ -1563,11 +1617,34 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges _ = workGroup.Wait() return firstErr.Get() } +======= + failpoint.Label("afterStartWorker") + + workGroup.Go(func() error { + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + return err + } +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) - jobWg.Wait() - workerCancel() - firstErr.Set(workGroup.Wait()) - firstErr.Set(ctx.Err()) + jobWg.Wait() + workerCancel() + return nil + }) + if err := workGroup.Wait(); err != nil { + if !common.IsContextCanceledError(err) { + log.FromContext(ctx).Error("do import meets error", zap.Error(err)) + } + firstErr.Set(err) + } return firstErr.Get() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 902dd906fa040..08d78096f2d83 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2085,3 +2085,160 @@ func TestCtxCancelIsIgnored(t *testing.T) { err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } +<<<<<<< HEAD +======= + +func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []common.Range{ + {Start: []byte{'c'}, End: []byte{'d'}}, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, + panicSplitRegionClient{}, + ), + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +} + +func TestExternalEngine(t *testing.T) { + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables") + }) + ctx := context.Background() + dir := t.TempDir() + storageURI := "file://" + filepath.ToSlash(dir) + storeBackend, err := storage.ParseBackend(storageURI, nil) + require.NoError(t, err) + extStorage, err := storage.New(ctx, storeBackend, nil) + require.NoError(t, err) + keys := make([][]byte, 100) + values := make([][]byte, 100) + for i := range keys { + keys[i] = []byte(fmt.Sprintf("key%06d", i)) + values[i] = []byte(fmt.Sprintf("value%06d", i)) + } + // simple append 0x00 + endKey := make([]byte, len(keys[99])+1) + copy(endKey, keys[99]) + + dataFiles, statFiles, err := external.MockExternalEngine(extStorage, keys, values) + require.NoError(t, err) + + externalCfg := &backend.ExternalEngineConfig{ + StorageURI: storageURI, + DataFiles: dataFiles, + StatFiles: statFiles, + StartKey: keys[0], + EndKey: endKey, + SplitKeys: [][]byte{keys[30], keys[60], keys[90]}, + TotalFileSize: int64(config.SplitRegionSize) + 1, + TotalKVCount: int64(config.SplitRegionKeys) + 1, + } + engineUUID := uuid.New() + pdCtl := &pdutil.PdController{} + pdCtl.SetPDClient(&mockPdClient{}) + local := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 2, + }, + splitCli: initTestSplitClient([][]byte{ + keys[0], keys[50], endKey, + }, nil), + pdCtl: pdCtl, + externalEngine: map[uuid.UUID]common.Engine{}, + keyAdapter: common.NoopKeyAdapter{}, + } + jobs := make([]*regionJob, 0, 5) + + jobToWorkerCh := make(chan *regionJob, 10) + testJobToWorkerCh = jobToWorkerCh + + done := make(chan struct{}) + go func() { + for i := 0; i < 5; i++ { + jobs = append(jobs, <-jobToWorkerCh) + testJobWg.Done() + } + }() + go func() { + err2 := local.CloseEngine( + ctx, + &backend.EngineConfig{External: externalCfg}, + engineUUID, + ) + require.NoError(t, err2) + err2 = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.NoError(t, err2) + close(done) + }() + + <-done + + // no jobs left in the channel + require.Len(t, jobToWorkerCh, 0) + + sort.Slice(jobs, func(i, j int) bool { + return bytes.Compare(jobs[i].keyRange.Start, jobs[j].keyRange.Start) < 0 + }) + expectedKeyRanges := []common.Range{ + {Start: keys[0], End: keys[30]}, + {Start: keys[30], End: keys[50]}, + {Start: keys[50], End: keys[60]}, + {Start: keys[60], End: keys[90]}, + {Start: keys[90], End: endKey}, + } + kvIdx := 0 + for i, job := range jobs { + require.Equal(t, expectedKeyRanges[i], job.keyRange) + iter := job.ingestData.NewIter(ctx, job.keyRange.Start, job.keyRange.End) + for iter.First(); iter.Valid(); iter.Next() { + require.Equal(t, keys[kvIdx], iter.Key()) + require.Equal(t, values[kvIdx], iter.Value()) + kvIdx++ + } + require.NoError(t, iter.Error()) + require.NoError(t, iter.Close()) + } + require.Equal(t, 100, kvIdx) +} + +func TestGetExternalEngineKVStatistics(t *testing.T) { + b := Backend{ + externalEngine: map[uuid.UUID]common.Engine{}, + } + // non existent uuid + size, count := b.GetExternalEngineKVStatistics(uuid.New()) + require.Zero(t, size) + require.Zero(t, count) +} +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index d677e9c1dc7ba..881935887c0c2 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -249,6 +249,16 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd } func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { +<<<<<<< HEAD +======= + c.mu.Lock() + defer c.mu.Unlock() + + if err := ctx.Err(); err != nil { + return nil, err + } + +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) }