Skip to content

Commit e96e604

Browse files
authored
ddl: limit the concurrent number of ingest jobs to 1 (#43210) (#43262)
close #42903
1 parent db2f585 commit e96e604

10 files changed

+161
-37
lines changed

ddl/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ go_test(
220220
deps = [
221221
"//autoid_service",
222222
"//config",
223+
"//ddl/ingest",
223224
"//ddl/internal/callback",
224225
"//ddl/placement",
225226
"//ddl/schematracker",

ddl/backfilling_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@ package ddl
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"testing"
2021

22+
"github.com/pingcap/tidb/ddl/ingest"
2123
"github.com/pingcap/tidb/kv"
24+
"github.com/pingcap/tidb/parser/model"
25+
"github.com/pingcap/tidb/sessionctx"
26+
"github.com/pingcap/tidb/sessionctx/variable"
2227
"github.com/stretchr/testify/require"
2328
)
2429

@@ -43,3 +48,43 @@ func TestDoneTaskKeeper(t *testing.T) {
4348
n.updateNextKey(6, kv.Key("h"))
4449
require.True(t, bytes.Equal(n.nextKey, kv.Key("h")))
4550
}
51+
52+
func TestPickBackfillType(t *testing.T) {
53+
originMgr := ingest.LitBackCtxMgr
54+
originInit := ingest.LitInitialized
55+
originFastReorg := variable.EnableFastReorg.Load()
56+
defer func() {
57+
ingest.LitBackCtxMgr = originMgr
58+
ingest.LitInitialized = originInit
59+
variable.EnableFastReorg.Store(originFastReorg)
60+
}()
61+
mockMgr := ingest.NewMockBackendCtxMgr(
62+
func() sessionctx.Context {
63+
return nil
64+
})
65+
ingest.LitBackCtxMgr = mockMgr
66+
mockCtx := context.Background()
67+
const uk = false
68+
mockJob := &model.Job{
69+
ID: 1,
70+
ReorgMeta: &model.DDLReorgMeta{
71+
ReorgTp: model.ReorgTypeTxn,
72+
},
73+
}
74+
variable.EnableFastReorg.Store(true)
75+
tp, err := pickBackfillType(mockCtx, mockJob, uk)
76+
require.NoError(t, err)
77+
require.Equal(t, tp, model.ReorgTypeTxn)
78+
79+
mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
80+
ingest.LitInitialized = false
81+
tp, err = pickBackfillType(mockCtx, mockJob, uk)
82+
require.NoError(t, err)
83+
require.Equal(t, tp, model.ReorgTypeTxnMerge)
84+
85+
mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
86+
ingest.LitInitialized = true
87+
tp, err = pickBackfillType(mockCtx, mockJob, uk)
88+
require.NoError(t, err)
89+
require.Equal(t, tp, model.ReorgTypeLitMerge)
90+
}

ddl/index.go

+50-26
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
622622
switch indexInfo.State {
623623
case model.StateNone:
624624
// none -> delete only
625-
reorgTp := pickBackfillType(job)
625+
var reorgTp model.ReorgType
626+
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
627+
if err != nil {
628+
break
629+
}
626630
if reorgTp.NeedMergeProcess() {
627631
// Increase telemetryAddIndexIngestUsage
628632
telemetryAddIndexIngestUsage.Inc()
@@ -711,39 +715,54 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
711715
}
712716

713717
// pickBackfillType determines which backfill process will be used.
714-
func pickBackfillType(job *model.Job) model.ReorgType {
718+
func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.ReorgType, error) {
715719
if job.ReorgMeta.ReorgTp != model.ReorgTypeNone {
716720
// The backfill task has been started.
717-
// Don't switch the backfill process.
718-
return job.ReorgMeta.ReorgTp
721+
// Don't change the backfill type.
722+
return job.ReorgMeta.ReorgTp, nil
719723
}
720-
if IsEnableFastReorg() {
721-
var useIngest bool
722-
if ingest.LitInitialized && ingest.LitBackCtxMgr.Available() {
723-
cleanupSortPath(job.ID)
724+
if !IsEnableFastReorg() {
725+
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
726+
return model.ReorgTypeTxn, nil
727+
}
728+
if ingest.LitInitialized {
729+
available, err := ingest.LitBackCtxMgr.CheckAvailable()
730+
if err != nil {
731+
return model.ReorgTypeNone, err
732+
}
733+
if available {
734+
err = cleanupSortPath(job.ID)
735+
if err != nil {
736+
return model.ReorgTypeNone, err
737+
}
738+
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID)
739+
if err != nil {
740+
return model.ReorgTypeNone, err
741+
}
724742
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
725-
return model.ReorgTypeLitMerge
743+
return model.ReorgTypeLitMerge, nil
726744
}
727-
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
728-
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
729-
zap.Bool("lightning env initialized", ingest.LitInitialized),
730-
zap.Bool("can use ingest", useIngest))
731-
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
732-
return model.ReorgTypeTxnMerge
733745
}
734-
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
735-
return model.ReorgTypeTxn
746+
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
747+
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
748+
zap.Bool("lightning env initialized", false))
749+
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
750+
return model.ReorgTypeTxnMerge, nil
736751
}
737752

738753
// cleanupSortPath is used to clean up the temp data of the previous jobs.
739754
// Because we don't remove all the files after the support of checkpoint,
740755
// there maybe some stale files in the sort path if TiDB is killed during the backfill process.
741-
func cleanupSortPath(currentJobID int64) {
756+
func cleanupSortPath(currentJobID int64) error {
742757
sortPath := ingest.ConfigSortPath()
758+
err := os.MkdirAll(sortPath, 0700)
759+
if err != nil {
760+
return errors.Trace(err)
761+
}
743762
entries, err := os.ReadDir(sortPath)
744763
if err != nil {
745-
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
746-
return
764+
logutil.BgLogger().Warn("[ddl-ingest] cannot read sort path", zap.Error(err))
765+
return errors.Trace(err)
747766
}
748767
for _, entry := range entries {
749768
if !entry.IsDir() {
@@ -762,10 +781,11 @@ func cleanupSortPath(currentJobID int64) {
762781
err := os.RemoveAll(filepath.Join(sortPath, entry.Name()))
763782
if err != nil {
764783
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
765-
return
784+
return nil
766785
}
767786
}
768787
}
788+
return nil
769789
}
770790

771791
// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
@@ -824,17 +844,21 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo
824844

825845
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
826846
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
827-
bfProcess := pickBackfillType(job)
828-
if !bfProcess.NeedMergeProcess() {
847+
var reorgTp model.ReorgType
848+
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
849+
if err != nil {
850+
return false, ver, err
851+
}
852+
if !reorgTp.NeedMergeProcess() {
829853
return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
830854
}
831855
switch indexInfo.BackfillState {
832856
case model.BackfillStateRunning:
833857
logutil.BgLogger().Info("[ddl] index backfill state running",
834858
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
835-
zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge),
859+
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
836860
zap.String("index", indexInfo.Name.O))
837-
switch bfProcess {
861+
switch reorgTp {
838862
case model.ReorgTypeLitMerge:
839863
if job.ReorgMeta.IsDistReorg {
840864
done, ver, err = runIngestReorgJobDist(w, d, t, job, tbl, indexInfo)
@@ -854,7 +878,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
854878
logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID),
855879
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
856880
indexInfo.BackfillState = model.BackfillStateMerging
857-
if bfProcess == model.ReorgTypeLitMerge {
881+
if reorgTp == model.ReorgTypeLitMerge {
858882
ingest.LitBackCtxMgr.Unregister(job.ID)
859883
}
860884
job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg.

ddl/ingest/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ go_test(
5858
],
5959
flaky = True,
6060
race = "on",
61-
shard_count = 7,
61+
shard_count = 8,
6262
deps = [
6363
":ingest",
6464
"//config",

ddl/ingest/backend_mgr.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
// BackendCtxMgr is used to manage the backend context.
3232
type BackendCtxMgr interface {
33-
Available() bool
33+
CheckAvailable() (bool, error)
3434
Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error)
3535
Unregister(jobID int64)
3636
Load(jobID int64) (BackendCtx, bool)
@@ -53,19 +53,27 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
5353
LitMemRoot = mgr.memRoot
5454
LitDiskRoot = mgr.diskRoot
5555
LitDiskRoot.UpdateUsage()
56+
err := LitDiskRoot.StartupCheck()
57+
if err != nil {
58+
logutil.BgLogger().Warn("[ddl-ingest] ingest backfill may not be available", zap.Error(err))
59+
}
5660
return mgr
5761
}
5862

59-
// Available checks if the ingest backfill is available.
60-
func (m *litBackendCtxMgr) Available() bool {
63+
// CheckAvailable checks if the ingest backfill is available.
64+
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
6165
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
6266
activeJobIDs := m.Keys()
6367
if len(activeJobIDs) > 0 {
6468
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job",
6569
zap.Int64("job ID", activeJobIDs[0]))
66-
return false
70+
return false, nil
71+
}
72+
if err := m.diskRoot.PreCheckUsage(); err != nil {
73+
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is not available", zap.Error(err))
74+
return false, err
6775
}
68-
return true
76+
return true, nil
6977
}
7078

7179
// Register creates a new backend and registers it to the backend context.

ddl/ingest/disk_root.go

+36
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"sync"
2020

21+
"github.com/pingcap/errors"
2122
lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
2223
"github.com/pingcap/tidb/sessionctx/variable"
2324
"github.com/pingcap/tidb/util/logutil"
@@ -29,6 +30,8 @@ type DiskRoot interface {
2930
UpdateUsage()
3031
ShouldImport() bool
3132
UsageInfo() string
33+
PreCheckUsage() error
34+
StartupCheck() error
3235
}
3336

3437
const capacityThreshold = 0.9
@@ -87,3 +90,36 @@ func (d *diskRootImpl) UsageInfo() string {
8790
defer d.mu.RUnlock()
8891
return fmt.Sprintf("disk usage: %d/%d, backend usage: %d", d.used, d.capacity, d.bcUsed)
8992
}
93+
94+
// PreCheckUsage implements DiskRoot interface.
95+
func (d *diskRootImpl) PreCheckUsage() error {
96+
sz, err := lcom.GetStorageSize(d.path)
97+
if err != nil {
98+
return errors.Trace(err)
99+
}
100+
if RiskOfDiskFull(sz.Available, sz.Capacity) {
101+
sortPath := ConfigSortPath()
102+
return errors.Errorf("sort path: %s, %s, please clean up the disk and retry", sortPath, d.UsageInfo())
103+
}
104+
return nil
105+
}
106+
107+
// StartupCheck implements DiskRoot interface.
108+
func (d *diskRootImpl) StartupCheck() error {
109+
sz, err := lcom.GetStorageSize(d.path)
110+
if err != nil {
111+
return errors.Trace(err)
112+
}
113+
quota := variable.DDLDiskQuota.Load()
114+
if sz.Available < quota {
115+
sortPath := ConfigSortPath()
116+
return errors.Errorf("the available disk space(%d) in %s should be greater than @@tidb_ddl_disk_quota(%d)",
117+
sz.Available, sortPath, quota)
118+
}
119+
return nil
120+
}
121+
122+
// RiskOfDiskFull checks if the disk has less than 10% space.
123+
func RiskOfDiskFull(available, capacity uint64) bool {
124+
return float64(available) < (1-capacityThreshold)*float64(capacity)
125+
}

ddl/ingest/mem_root_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,9 @@ func TestMemoryRoot(t *testing.T) {
5858
memRoot.Consume(10) // Mix usage of tag and non-tag.
5959
require.Equal(t, int64(522), memRoot.CurrentUsage())
6060
}
61+
62+
func TestRiskOfDiskFull(t *testing.T) {
63+
require.False(t, ingest.RiskOfDiskFull(11, 100))
64+
require.False(t, ingest.RiskOfDiskFull(10, 100))
65+
require.True(t, ingest.RiskOfDiskFull(9, 100))
66+
}

ddl/ingest/mock.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
3939
}
4040
}
4141

42-
// Available implements BackendCtxMgr.Available interface.
43-
func (*MockBackendCtxMgr) Available() bool {
44-
return true
42+
// CheckAvailable implements BackendCtxMgr.Available interface.
43+
func (*MockBackendCtxMgr) CheckAvailable() (bool, error) {
44+
return true, nil
4545
}
4646

4747
// Register implements BackendCtxMgr.Register interface.

ddl/rollingback.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ func cleanupLocalIndexData(jobID int64) {
8888
sortPath := ingest.ConfigSortPath()
8989
f := filepath.Join(sortPath, ingest.EncodeBackendTag(jobID))
9090
err := os.RemoveAll(f)
91-
logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err))
91+
if err != nil {
92+
logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err))
93+
}
9294
}
9395

9496
// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,

tests/realtikvtest/addindextest/integration_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,9 @@ func TestAddIndexIngestCancel(t *testing.T) {
395395
tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob)
396396
require.True(t, cancelled)
397397
dom.DDL().SetHook(defHook)
398-
require.True(t, ingest.LitBackCtxMgr.Available())
398+
ok, err := ingest.LitBackCtxMgr.CheckAvailable()
399+
require.NoError(t, err)
400+
require.True(t, ok)
399401
}
400402

401403
func TestAddIndexSplitTableRanges(t *testing.T) {

0 commit comments

Comments
 (0)