diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 56c5bf79d0620..cdf1c0bb7db2a 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -908,7 +908,7 @@ func (d *ddl) CleanUpTempDirLoop(path string) { if j := d.runningJobs.Load(); j != nil { runningIDs = j.cloneRunningIDs() } - ingest.CleanUpTempDir(path, runningIDs) + ingest.CleanUpTempDir(d.ctx, path, runningIDs) d.sessPool.Put(se) case <-d.ctx.Done(): return diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 04f4afa1db8ad..7d74b67f9d5b6 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1967,7 +1967,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { } taskType := proto.Backfill - taskKey := fmt.Sprintf("ddl/%s/%d", taskType, reorgInfo.Job.ID) + taskKey := ddlutil.DistTaskKey(taskType, reorgInfo.Job.ID) g, ctx := errgroup.WithContext(w.ctx) ctx = kv.WithInternalSourceType(ctx, kv.InternalDistTask) diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 8af2c74ff43f7..c0f94fea0062c 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -23,6 +23,8 @@ go_library( "//pkg/ddl/internal/session", "//pkg/ddl/logutil", "//pkg/ddl/util", + "//pkg/disttask/framework/proto", + "//pkg/disttask/framework/storage", "//pkg/kv", "//pkg/lightning/backend", "//pkg/lightning/backend/encode", @@ -79,9 +81,13 @@ go_test( "//pkg/ddl/ingest/testutil", "//pkg/ddl/internal/session", "//pkg/ddl/testutil", + "//pkg/ddl/util", "//pkg/ddl/util/callback", + "//pkg/disttask/framework/proto", + "//pkg/disttask/framework/storage", "//pkg/domain", "//pkg/errno", + "//pkg/kv", "//pkg/parser/model", "//pkg/testkit", "//pkg/testkit/testfailpoint", @@ -91,6 +97,7 @@ go_test( "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", "@org_uber_go_goleak//:goleak", ], diff --git a/pkg/ddl/ingest/env.go b/pkg/ddl/ingest/env.go index 41e3ac82fba03..2f5e25218f617 100644 --- a/pkg/ddl/ingest/env.go +++ b/pkg/ddl/ingest/env.go @@ -16,17 +16,17 @@ package ingest import ( "context" - "fmt" "os" "path/filepath" "strconv" "strings" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" - sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" + ddlutil "github.com/pingcap/tidb/pkg/ddl/util" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/memory" @@ -107,10 +107,10 @@ func GenIngestTempDataDir() (string, error) { return sortPath, nil } -// CleanUpTempDir is used to remove the stale index data. This function gets -// running DDL jobs from jobScheduler and it only removes the folders that -// related to finished jobs. -func CleanUpTempDir(path string, runningIDs map[int64]struct{}) { +// CleanUpTempDir is used to remove the stale index data. Caller will pass +// running DDL jobs from jobScheduler and it internally checks disktask system +// table to filter related jobs. +func CleanUpTempDir(ctx context.Context, path string, runningIDs map[int64]struct{}) { entries, err := os.ReadDir(path) if err != nil { if strings.Contains(err.Error(), "no such file") { @@ -144,35 +144,27 @@ func CleanUpTempDir(path string, runningIDs map[int64]struct{}) { return } + m, err2 := storage.GetTaskManager() + if err2 != nil { + logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err2)) + return + } + for id := range toCheckJobIDs { - logutil.DDLIngestLogger().Info("remove stale temp index data", - zap.Int64("jobID", id)) - p := filepath.Join(path, encodeBackendTag(id)) - err = os.RemoveAll(p) - if err != nil { - logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err)) + cnt, err3 := m.GetTaskCountByKeyPrefix(ctx, ddlutil.DistTaskKey(proto.Backfill, id)) + if err3 != nil { + logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err3)) + continue } - } -} -func filterProcessingJobIDs(ctx context.Context, se *sess.Session, jobIDs []int64) ([]int64, error) { - var sb strings.Builder - for i, id := range jobIDs { - if i != 0 { - sb.WriteString(",") + if cnt == 0 { + logutil.DDLIngestLogger().Info("remove stale temp index data", + zap.Int64("jobID", id)) + p := filepath.Join(path, encodeBackendTag(id)) + err = os.RemoveAll(p) + if err != nil { + logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err)) + } } - sb.WriteString(strconv.FormatInt(id, 10)) - } - sql := fmt.Sprintf( - "SELECT job_id FROM mysql.tidb_ddl_job WHERE job_id IN (%s) AND processing", - sb.String()) - rows, err := se.Execute(ctx, sql, "filter_processing_job_ids") - if err != nil { - return nil, errors.Trace(err) - } - ret := make([]int64, 0, len(rows)) - for _, row := range rows { - ret = append(ret, row.GetInt64(0)) } - return ret, nil } diff --git a/pkg/ddl/ingest/env_test.go b/pkg/ddl/ingest/env_test.go index 134a3d1084bad..a77ca09774ff4 100644 --- a/pkg/ddl/ingest/env_test.go +++ b/pkg/ddl/ingest/env_test.go @@ -15,13 +15,23 @@ package ingest_test import ( + "context" + "fmt" "os" "path/filepath" "testing" + "time" + "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/ingest" + ddlutil "github.com/pingcap/tidb/pkg/ddl/util" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" ) func TestGenLightningDataDir(t *testing.T) { @@ -37,29 +47,78 @@ func TestGenLightningDataDir(t *testing.T) { } func TestLitBackendCtxMgr(t *testing.T) { + ctx := context.Background() + ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) + store := testkit.CreateMockStore(t) sortPath := t.TempDir() staleJobDir := filepath.Join(sortPath, "100") staleJobDir2 := filepath.Join(sortPath, "101") + staleJobDir3 := filepath.Join(sortPath, "102") err := os.MkdirAll(staleJobDir, 0o700) require.NoError(t, err) err = os.MkdirAll(staleJobDir2, 0o700) require.NoError(t, err) + err = os.MkdirAll(staleJobDir3, 0o700) + require.NoError(t, err) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return testkit.NewSession(t, store), nil + }, 10, 10, time.Second) + t.Cleanup(func() { + pool.Close() + }) + taskManager := storage.NewTaskManager(pool) + storage.SetTaskManager(taskManager) + t.Cleanup(func() { + storage.SetTaskManager(nil) + }) runningJobs := map[int64]struct{}{ 100: {}, - 101: {}, } + taskKey101 := ddlutil.DistTaskKey(proto.Backfill, 101) + _, err = taskManager.CreateTask( + ctx, + taskKey101, + proto.Backfill, + 1, + "", + nil, + ) + require.NoError(t, err) + // multi-schema change task key + taskKey102 := ddlutil.DistTaskKey(proto.Backfill, 102) + "/1" + _, err = taskManager.CreateTask( + ctx, + taskKey102, + proto.Backfill, + 1, + "", + nil, + ) + require.NoError(t, err) - ingest.CleanUpTempDir(sortPath, runningJobs) + ingest.CleanUpTempDir(ctx, sortPath, runningJobs) require.DirExists(t, staleJobDir) require.DirExists(t, staleJobDir2) + require.DirExists(t, staleJobDir3) - delete(runningJobs, 101) - ingest.CleanUpTempDir(sortPath, runningJobs) - require.DirExists(t, staleJobDir) + ingest.CleanUpTempDir(ctx, sortPath, nil) + require.NoDirExists(t, staleJobDir) + require.DirExists(t, staleJobDir2) + require.DirExists(t, staleJobDir3) + + // mimic task is moved to history table + tk := testkit.NewTestKit(t, store) + tk.MustExec(fmt.Sprintf("delete from mysql.tidb_global_task where task_key = '%s'", taskKey101)) + + ingest.CleanUpTempDir(ctx, sortPath, nil) + require.NoDirExists(t, staleJobDir) require.NoDirExists(t, staleJobDir2) + require.DirExists(t, staleJobDir3) - ingest.CleanUpTempDir(sortPath, nil) + tk.MustExec(fmt.Sprintf("delete from mysql.tidb_global_task where task_key = '%s'", taskKey102)) + ingest.CleanUpTempDir(ctx, sortPath, nil) require.NoDirExists(t, staleJobDir) require.NoDirExists(t, staleJobDir2) + require.NoDirExists(t, staleJobDir3) } diff --git a/pkg/ddl/util/BUILD.bazel b/pkg/ddl/util/BUILD.bazel index e03ba28d779a7..fd10661901093 100644 --- a/pkg/ddl/util/BUILD.bazel +++ b/pkg/ddl/util/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ddl/logutil", + "//pkg/disttask/framework/proto", "//pkg/kv", "//pkg/parser/model", "//pkg/parser/terror", diff --git a/pkg/ddl/util/util.go b/pkg/ddl/util/util.go index 6f088e0646854..e93171d8b77bc 100644 --- a/pkg/ddl/util/util.go +++ b/pkg/ddl/util/util.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/logutil" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" @@ -425,3 +426,8 @@ func GenKeyExistsErr(key, value []byte, idxInfo *model.IndexInfo, tblInfo *model } return kv.GenKeyExistsErr(valueStr, indexName) } + +// DistTaskKey generates the key for the dist task. +func DistTaskKey(taskType proto.TaskType, jobID int64) string { + return fmt.Sprintf("ddl/%s/%d", taskType, jobID) +} diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 1fb3e1b0746cd..fe83202890871 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -375,6 +375,16 @@ func (mgr *TaskManager) GetTaskByKey(ctx context.Context, key string) (task *pro return Row2Task(rs[0]), nil } +// GetTaskCountByKeyPrefix gets the task count by the task key prefix. +func (mgr *TaskManager) GetTaskCountByKeyPrefix(ctx context.Context, keyPrefix string) (int64, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select count(*) from mysql.tidb_global_task t where task_key like %?", keyPrefix+"%") + if err != nil { + return 0, err + } + + return rs[0].GetInt64(0), nil +} + // GetTaskByKeyWithHistory gets the task from history table by the task key. func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error) { rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+TaskColumns+" from mysql.tidb_global_task t where task_key = %?"+