Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: limit the concurrent number of ingest jobs to 1 (#43210) #43262

Merged
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ go_test(
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/internal/callback",
"//ddl/placement",
"//ddl/schematracker",
Expand Down
45 changes: 45 additions & 0 deletions ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ package ddl

import (
"bytes"
"context"
"testing"

"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/stretchr/testify/require"
)

Expand All @@ -43,3 +48,43 @@ func TestDoneTaskKeeper(t *testing.T) {
n.updateNextKey(6, kv.Key("h"))
require.True(t, bytes.Equal(n.nextKey, kv.Key("h")))
}

func TestPickBackfillType(t *testing.T) {
originMgr := ingest.LitBackCtxMgr
originInit := ingest.LitInitialized
originFastReorg := variable.EnableFastReorg.Load()
defer func() {
ingest.LitBackCtxMgr = originMgr
ingest.LitInitialized = originInit
variable.EnableFastReorg.Store(originFastReorg)
}()
mockMgr := ingest.NewMockBackendCtxMgr(
func() sessionctx.Context {
return nil
})
ingest.LitBackCtxMgr = mockMgr
mockCtx := context.Background()
const uk = false
mockJob := &model.Job{
ID: 1,
ReorgMeta: &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeTxn,
},
}
variable.EnableFastReorg.Store(true)
tp, err := pickBackfillType(mockCtx, mockJob, uk)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxn)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = false
tp, err = pickBackfillType(mockCtx, mockJob, uk)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxnMerge)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = true
tp, err = pickBackfillType(mockCtx, mockJob, uk)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeLitMerge)
}
76 changes: 50 additions & 26 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
switch indexInfo.State {
case model.StateNone:
// none -> delete only
reorgTp := pickBackfillType(job)
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
if err != nil {
break
}
if reorgTp.NeedMergeProcess() {
// Increase telemetryAddIndexIngestUsage
telemetryAddIndexIngestUsage.Inc()
Expand Down Expand Up @@ -711,39 +715,54 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

// pickBackfillType determines which backfill process will be used.
func pickBackfillType(job *model.Job) model.ReorgType {
func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.ReorgType, error) {
if job.ReorgMeta.ReorgTp != model.ReorgTypeNone {
// The backfill task has been started.
// Don't switch the backfill process.
return job.ReorgMeta.ReorgTp
// Don't change the backfill type.
return job.ReorgMeta.ReorgTp, nil
}
if IsEnableFastReorg() {
var useIngest bool
if ingest.LitInitialized && ingest.LitBackCtxMgr.Available() {
cleanupSortPath(job.ID)
if !IsEnableFastReorg() {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
return model.ReorgTypeTxn, nil
}
if ingest.LitInitialized {
available, err := ingest.LitBackCtxMgr.CheckAvailable()
if err != nil {
return model.ReorgTypeNone, err
}
if available {
err = cleanupSortPath(job.ID)
if err != nil {
return model.ReorgTypeNone, err
}
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID)
if err != nil {
return model.ReorgTypeNone, err
}
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge, nil
}
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
zap.Bool("lightning env initialized", ingest.LitInitialized),
zap.Bool("can use ingest", useIngest))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
return model.ReorgTypeTxnMerge
}
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
return model.ReorgTypeTxn
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
zap.Bool("lightning env initialized", false))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
return model.ReorgTypeTxnMerge, nil
}

// cleanupSortPath is used to clean up the temp data of the previous jobs.
// Because we don't remove all the files after the support of checkpoint,
// there maybe some stale files in the sort path if TiDB is killed during the backfill process.
func cleanupSortPath(currentJobID int64) {
func cleanupSortPath(currentJobID int64) error {
sortPath := ingest.ConfigSortPath()
err := os.MkdirAll(sortPath, 0700)
if err != nil {
return errors.Trace(err)
}
entries, err := os.ReadDir(sortPath)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
return
logutil.BgLogger().Warn("[ddl-ingest] cannot read sort path", zap.Error(err))
return errors.Trace(err)
}
for _, entry := range entries {
if !entry.IsDir() {
Expand All @@ -762,10 +781,11 @@ func cleanupSortPath(currentJobID int64) {
err := os.RemoveAll(filepath.Join(sortPath, entry.Name()))
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
return
return nil
}
}
}
return nil
}

// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
Expand Down Expand Up @@ -824,17 +844,21 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
bfProcess := pickBackfillType(job)
if !bfProcess.NeedMergeProcess() {
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
if err != nil {
return false, ver, err
}
if !reorgTp.NeedMergeProcess() {
return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
}
switch indexInfo.BackfillState {
case model.BackfillStateRunning:
logutil.BgLogger().Info("[ddl] index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge),
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
zap.String("index", indexInfo.Name.O))
switch bfProcess {
switch reorgTp {
case model.ReorgTypeLitMerge:
if job.ReorgMeta.IsDistReorg {
done, ver, err = runIngestReorgJobDist(w, d, t, job, tbl, indexInfo)
Expand All @@ -854,7 +878,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
indexInfo.BackfillState = model.BackfillStateMerging
if bfProcess == model.ReorgTypeLitMerge {
if reorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg.
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 7,
shard_count = 8,
deps = [
":ingest",
"//config",
Expand Down
18 changes: 13 additions & 5 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
Available() bool
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
Expand All @@ -53,19 +53,27 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
LitMemRoot = mgr.memRoot
LitDiskRoot = mgr.diskRoot
LitDiskRoot.UpdateUsage()
err := LitDiskRoot.StartupCheck()
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] ingest backfill may not be available", zap.Error(err))
}
return mgr
}

// Available checks if the ingest backfill is available.
func (m *litBackendCtxMgr) Available() bool {
// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
activeJobIDs := m.Keys()
if len(activeJobIDs) > 0 {
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job",
zap.Int64("job ID", activeJobIDs[0]))
return false
return false, nil
}
if err := m.diskRoot.PreCheckUsage(); err != nil {
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is not available", zap.Error(err))
return false, err
}
return true
return true, nil
}

// Register creates a new backend and registers it to the backend context.
Expand Down
36 changes: 36 additions & 0 deletions ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sync"

"github.com/pingcap/errors"
lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -29,6 +30,8 @@ type DiskRoot interface {
UpdateUsage()
ShouldImport() bool
UsageInfo() string
PreCheckUsage() error
StartupCheck() error
}

const capacityThreshold = 0.9
Expand Down Expand Up @@ -87,3 +90,36 @@ func (d *diskRootImpl) UsageInfo() string {
defer d.mu.RUnlock()
return fmt.Sprintf("disk usage: %d/%d, backend usage: %d", d.used, d.capacity, d.bcUsed)
}

// PreCheckUsage implements DiskRoot interface.
func (d *diskRootImpl) PreCheckUsage() error {
sz, err := lcom.GetStorageSize(d.path)
if err != nil {
return errors.Trace(err)
}
if RiskOfDiskFull(sz.Available, sz.Capacity) {
sortPath := ConfigSortPath()
return errors.Errorf("sort path: %s, %s, please clean up the disk and retry", sortPath, d.UsageInfo())
}
return nil
}

// StartupCheck implements DiskRoot interface.
func (d *diskRootImpl) StartupCheck() error {
sz, err := lcom.GetStorageSize(d.path)
if err != nil {
return errors.Trace(err)
}
quota := variable.DDLDiskQuota.Load()
if sz.Available < quota {
sortPath := ConfigSortPath()
return errors.Errorf("the available disk space(%d) in %s should be greater than @@tidb_ddl_disk_quota(%d)",
sz.Available, sortPath, quota)
}
return nil
}

// RiskOfDiskFull checks if the disk has less than 10% space.
func RiskOfDiskFull(available, capacity uint64) bool {
return float64(available) < (1-capacityThreshold)*float64(capacity)
}
6 changes: 6 additions & 0 deletions ddl/ingest/mem_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ func TestMemoryRoot(t *testing.T) {
memRoot.Consume(10) // Mix usage of tag and non-tag.
require.Equal(t, int64(522), memRoot.CurrentUsage())
}

func TestRiskOfDiskFull(t *testing.T) {
require.False(t, ingest.RiskOfDiskFull(11, 100))
require.False(t, ingest.RiskOfDiskFull(10, 100))
require.True(t, ingest.RiskOfDiskFull(9, 100))
}
6 changes: 3 additions & 3 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}
}

// Available implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) Available() bool {
return true
// CheckAvailable implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

// Register implements BackendCtxMgr.Register interface.
Expand Down
4 changes: 3 additions & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func cleanupLocalIndexData(jobID int64) {
sortPath := ingest.ConfigSortPath()
f := filepath.Join(sortPath, ingest.EncodeBackendTag(jobID))
err := os.RemoveAll(f)
logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err))
if err != nil {
logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err))
}
}

// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,
Expand Down
4 changes: 3 additions & 1 deletion tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ func TestAddIndexIngestCancel(t *testing.T) {
tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob)
require.True(t, cancelled)
dom.DDL().SetHook(defHook)
require.True(t, ingest.LitBackCtxMgr.Available())
ok, err := ingest.LitBackCtxMgr.CheckAvailable()
require.NoError(t, err)
require.True(t, ok)
}

func TestAddIndexSplitTableRanges(t *testing.T) {
Expand Down