Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#48185
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
D3Hunter authored and ti-chi-bot committed Nov 1, 2023
1 parent 699227f commit 08f7acd
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 4 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,9 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by
LowerBound: lowerBound,
UpperBound: upperBound,
}
failpoint.Inject("mockGetFirstAndLastKey", func() {
failpoint.Return(lowerBound, upperBound, nil)
})

iter := e.newKVIter(context.Background(), opt)
//nolint: errcheck
Expand Down
85 changes: 81 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,10 +1121,63 @@ func (local *Backend) generateAndSendJob(
) error {
logger := log.FromContext(ctx)

<<<<<<< HEAD
// when use dynamic region feature, the region may be very big, we need
// to split to smaller ranges to increase the concurrency.
if regionSplitSize > 2*int64(config.SplitRegionSize) {
sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter)
=======
logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges)))

eg, egCtx := errgroup.WithContext(ctx)

dataAndRangeCh := make(chan common.DataAndRange)
for i := 0; i < local.WorkerConcurrency; i++ {
eg.Go(func() error {
for {
select {
case <-egCtx.Done():
return nil
case p, ok := <-dataAndRangeCh:
if !ok {
return nil
}

failpoint.Inject("beforeGenerateJob", nil)
failpoint.Inject("sendDummyJob", func(_ failpoint.Value) {
// this is used to trigger worker failure, used together
// with WriteToTiKVNotEnoughDiskSpace
jobToWorkerCh <- &regionJob{}
time.Sleep(5 * time.Second)
})
jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys)
if err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return err
}
for _, job := range jobs {
job.ref(jobWg)
select {
case <-egCtx.Done():
// this job is not put into jobToWorkerCh
job.done(jobWg)
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
return nil
case jobToWorkerCh <- job:
}
}
}
}
})
}

eg.Go(func() error {
err := engine.LoadIngestData(egCtx, jobRanges, dataAndRangeCh)
>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1548,6 +1601,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
})
}

<<<<<<< HEAD
err := local.prepareAndSendJob(
workerCtx,
engine,
Expand All @@ -1563,11 +1617,34 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
_ = workGroup.Wait()
return firstErr.Get()
}
=======
failpoint.Label("afterStartWorker")

workGroup.Go(func() error {
err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
return err
}
>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185))

jobWg.Wait()
workerCancel()
firstErr.Set(workGroup.Wait())
firstErr.Set(ctx.Err())
jobWg.Wait()
workerCancel()
return nil
})
if err := workGroup.Wait(); err != nil {
if !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}
firstErr.Set(err)
}
return firstErr.Get()
}

Expand Down
157 changes: 157 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,3 +2085,160 @@ func TestCtxCancelIsIgnored(t *testing.T) {
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}
<<<<<<< HEAD
=======

func TestWorkerFailedWhenGeneratingJobs(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
})

initRanges := []common.Range{
{Start: []byte{'c'}, End: []byte{'d'}},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 1,
},
splitCli: initTestSplitClient(
[][]byte{{1}, {11}},
panicSplitRegionClient{},
),
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

func TestExternalEngine(t *testing.T) {
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables")
})
ctx := context.Background()
dir := t.TempDir()
storageURI := "file://" + filepath.ToSlash(dir)
storeBackend, err := storage.ParseBackend(storageURI, nil)
require.NoError(t, err)
extStorage, err := storage.New(ctx, storeBackend, nil)
require.NoError(t, err)
keys := make([][]byte, 100)
values := make([][]byte, 100)
for i := range keys {
keys[i] = []byte(fmt.Sprintf("key%06d", i))
values[i] = []byte(fmt.Sprintf("value%06d", i))
}
// simple append 0x00
endKey := make([]byte, len(keys[99])+1)
copy(endKey, keys[99])

dataFiles, statFiles, err := external.MockExternalEngine(extStorage, keys, values)
require.NoError(t, err)

externalCfg := &backend.ExternalEngineConfig{
StorageURI: storageURI,
DataFiles: dataFiles,
StatFiles: statFiles,
StartKey: keys[0],
EndKey: endKey,
SplitKeys: [][]byte{keys[30], keys[60], keys[90]},
TotalFileSize: int64(config.SplitRegionSize) + 1,
TotalKVCount: int64(config.SplitRegionKeys) + 1,
}
engineUUID := uuid.New()
pdCtl := &pdutil.PdController{}
pdCtl.SetPDClient(&mockPdClient{})
local := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
},
splitCli: initTestSplitClient([][]byte{
keys[0], keys[50], endKey,
}, nil),
pdCtl: pdCtl,
externalEngine: map[uuid.UUID]common.Engine{},
keyAdapter: common.NoopKeyAdapter{},
}
jobs := make([]*regionJob, 0, 5)

jobToWorkerCh := make(chan *regionJob, 10)
testJobToWorkerCh = jobToWorkerCh

done := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
jobs = append(jobs, <-jobToWorkerCh)
testJobWg.Done()
}
}()
go func() {
err2 := local.CloseEngine(
ctx,
&backend.EngineConfig{External: externalCfg},
engineUUID,
)
require.NoError(t, err2)
err2 = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.NoError(t, err2)
close(done)
}()

<-done

// no jobs left in the channel
require.Len(t, jobToWorkerCh, 0)

sort.Slice(jobs, func(i, j int) bool {
return bytes.Compare(jobs[i].keyRange.Start, jobs[j].keyRange.Start) < 0
})
expectedKeyRanges := []common.Range{
{Start: keys[0], End: keys[30]},
{Start: keys[30], End: keys[50]},
{Start: keys[50], End: keys[60]},
{Start: keys[60], End: keys[90]},
{Start: keys[90], End: endKey},
}
kvIdx := 0
for i, job := range jobs {
require.Equal(t, expectedKeyRanges[i], job.keyRange)
iter := job.ingestData.NewIter(ctx, job.keyRange.Start, job.keyRange.End)
for iter.First(); iter.Valid(); iter.Next() {
require.Equal(t, keys[kvIdx], iter.Key())
require.Equal(t, values[kvIdx], iter.Value())
kvIdx++
}
require.NoError(t, iter.Error())
require.NoError(t, iter.Close())
}
require.Equal(t, 100, kvIdx)
}

func TestGetExternalEngineKVStatistics(t *testing.T) {
b := Backend{
externalEngine: map[uuid.UUID]common.Engine{},
}
// non existent uuid
size, count := b.GetExternalEngineKVStatistics(uuid.New())
require.Zero(t, size)
require.Zero(t, count)
}
>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185))
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd
}

func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) {
<<<<<<< HEAD
=======
c.mu.Lock()
defer c.mu.Unlock()

if err := ctx.Err(); err != nil {
return nil, err
}

>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185))
if c.hook != nil {
key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit)
}
Expand Down

0 comments on commit 08f7acd

Please sign in to comment.