Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Jul 11, 2024
1 parent c56c193 commit 86d027f
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ func (d *ddl) CleanUpTempDirLoop(path string) {
if j := d.runningJobs.Load(); j != nil {
runningIDs = j.cloneRunningIDs()
}
ingest.CleanUpTempDir(path, runningIDs)
ingest.CleanUpTempDir(d.ctx, path, runningIDs)
d.sessPool.Put(se)
case <-d.ctx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,7 +1967,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
}

taskType := proto.Backfill
taskKey := fmt.Sprintf("ddl/%s/%d", taskType, reorgInfo.Job.ID)
taskKey := ddlutil.DistTaskKey(taskType, reorgInfo.Job.ID)
g, ctx := errgroup.WithContext(w.ctx)
ctx = kv.WithInternalSourceType(ctx, kv.InternalDistTask)

Expand Down
7 changes: 7 additions & 0 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ go_library(
"//pkg/ddl/internal/session",
"//pkg/ddl/logutil",
"//pkg/ddl/util",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/kv",
"//pkg/lightning/backend",
"//pkg/lightning/backend/encode",
Expand Down Expand Up @@ -79,9 +81,13 @@ go_test(
"//pkg/ddl/ingest/testutil",
"//pkg/ddl/internal/session",
"//pkg/ddl/testutil",
"//pkg/ddl/util",
"//pkg/ddl/util/callback",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/domain",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
Expand All @@ -91,6 +97,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_goleak//:goleak",
],
Expand Down
58 changes: 25 additions & 33 deletions pkg/ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ package ingest

import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/memory"
Expand Down Expand Up @@ -107,10 +107,10 @@ func GenIngestTempDataDir() (string, error) {
return sortPath, nil
}

// CleanUpTempDir is used to remove the stale index data. This function gets
// running DDL jobs from jobScheduler and it only removes the folders that
// related to finished jobs.
func CleanUpTempDir(path string, runningIDs map[int64]struct{}) {
// CleanUpTempDir is used to remove the stale index data. Caller will pass
// running DDL jobs from jobScheduler and it internally checks disktask system
// table to filter related jobs.
func CleanUpTempDir(ctx context.Context, path string, runningIDs map[int64]struct{}) {
entries, err := os.ReadDir(path)
if err != nil {
if strings.Contains(err.Error(), "no such file") {
Expand Down Expand Up @@ -144,35 +144,27 @@ func CleanUpTempDir(path string, runningIDs map[int64]struct{}) {
return
}

m, err2 := storage.GetTaskManager()
if err2 != nil {
logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err2))
return
}

for id := range toCheckJobIDs {
logutil.DDLIngestLogger().Info("remove stale temp index data",
zap.Int64("jobID", id))
p := filepath.Join(path, encodeBackendTag(id))
err = os.RemoveAll(p)
if err != nil {
logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err))
cnt, err3 := m.GetTaskCountByKeyPrefix(ctx, ddlutil.DistTaskKey(proto.Backfill, id))
if err3 != nil {
logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err3))
continue
}
}
}

func filterProcessingJobIDs(ctx context.Context, se *sess.Session, jobIDs []int64) ([]int64, error) {
var sb strings.Builder
for i, id := range jobIDs {
if i != 0 {
sb.WriteString(",")
if cnt == 0 {
logutil.DDLIngestLogger().Info("remove stale temp index data",
zap.Int64("jobID", id))
p := filepath.Join(path, encodeBackendTag(id))
err = os.RemoveAll(p)
if err != nil {
logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err))
}
}
sb.WriteString(strconv.FormatInt(id, 10))
}
sql := fmt.Sprintf(
"SELECT job_id FROM mysql.tidb_ddl_job WHERE job_id IN (%s) AND processing",
sb.String())
rows, err := se.Execute(ctx, sql, "filter_processing_job_ids")
if err != nil {
return nil, errors.Trace(err)
}
ret := make([]int64, 0, len(rows))
for _, row := range rows {
ret = append(ret, row.GetInt64(0))
}
return ret, nil
}
71 changes: 65 additions & 6 deletions pkg/ddl/ingest/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
package ingest_test

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
)

func TestGenLightningDataDir(t *testing.T) {
Expand All @@ -37,29 +47,78 @@ func TestGenLightningDataDir(t *testing.T) {
}

func TestLitBackendCtxMgr(t *testing.T) {
ctx := context.Background()
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
store := testkit.CreateMockStore(t)
sortPath := t.TempDir()
staleJobDir := filepath.Join(sortPath, "100")
staleJobDir2 := filepath.Join(sortPath, "101")
staleJobDir3 := filepath.Join(sortPath, "102")
err := os.MkdirAll(staleJobDir, 0o700)
require.NoError(t, err)
err = os.MkdirAll(staleJobDir2, 0o700)
require.NoError(t, err)
err = os.MkdirAll(staleJobDir3, 0o700)
require.NoError(t, err)
pool := pools.NewResourcePool(func() (pools.Resource, error) {
return testkit.NewSession(t, store), nil
}, 10, 10, time.Second)
t.Cleanup(func() {
pool.Close()
})
taskManager := storage.NewTaskManager(pool)
storage.SetTaskManager(taskManager)
t.Cleanup(func() {
storage.SetTaskManager(nil)
})

runningJobs := map[int64]struct{}{
100: {},
101: {},
}
taskKey101 := ddlutil.DistTaskKey(proto.Backfill, 101)
_, err = taskManager.CreateTask(
ctx,
taskKey101,
proto.Backfill,
1,
"",
nil,
)
require.NoError(t, err)
// multi-schema change task key
taskKey102 := ddlutil.DistTaskKey(proto.Backfill, 102) + "/1"
_, err = taskManager.CreateTask(
ctx,
taskKey102,
proto.Backfill,
1,
"",
nil,
)
require.NoError(t, err)

ingest.CleanUpTempDir(sortPath, runningJobs)
ingest.CleanUpTempDir(ctx, sortPath, runningJobs)
require.DirExists(t, staleJobDir)
require.DirExists(t, staleJobDir2)
require.DirExists(t, staleJobDir3)

delete(runningJobs, 101)
ingest.CleanUpTempDir(sortPath, runningJobs)
require.DirExists(t, staleJobDir)
ingest.CleanUpTempDir(ctx, sortPath, nil)
require.NoDirExists(t, staleJobDir)
require.DirExists(t, staleJobDir2)
require.DirExists(t, staleJobDir3)

// mimic task is moved to history table
tk := testkit.NewTestKit(t, store)
tk.MustExec(fmt.Sprintf("delete from mysql.tidb_global_task where task_key = '%s'", taskKey101))

ingest.CleanUpTempDir(ctx, sortPath, nil)
require.NoDirExists(t, staleJobDir)
require.NoDirExists(t, staleJobDir2)
require.DirExists(t, staleJobDir3)

ingest.CleanUpTempDir(sortPath, nil)
tk.MustExec(fmt.Sprintf("delete from mysql.tidb_global_task where task_key = '%s'", taskKey102))
ingest.CleanUpTempDir(ctx, sortPath, nil)
require.NoDirExists(t, staleJobDir)
require.NoDirExists(t, staleJobDir2)
require.NoDirExists(t, staleJobDir3)
}
1 change: 1 addition & 0 deletions pkg/ddl/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/logutil",
"//pkg/disttask/framework/proto",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/parser/terror",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
Expand Down Expand Up @@ -425,3 +426,8 @@ func GenKeyExistsErr(key, value []byte, idxInfo *model.IndexInfo, tblInfo *model
}
return kv.GenKeyExistsErr(valueStr, indexName)
}

// DistTaskKey generates the key for the dist task.
func DistTaskKey(taskType proto.TaskType, jobID int64) string {
return fmt.Sprintf("ddl/%s/%d", taskType, jobID)
}
10 changes: 10 additions & 0 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ func (mgr *TaskManager) GetTaskByKey(ctx context.Context, key string) (task *pro
return Row2Task(rs[0]), nil
}

// GetTaskCountByKeyPrefix gets the task count by the task key prefix.
func (mgr *TaskManager) GetTaskCountByKeyPrefix(ctx context.Context, keyPrefix string) (int64, error) {
rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select count(*) from mysql.tidb_global_task t where task_key like %?", keyPrefix+"%")
if err != nil {
return 0, err
}

return rs[0].GetInt64(0), nil
}

// GetTaskByKeyWithHistory gets the task from history table by the task key.
func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error) {
rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+TaskColumns+" from mysql.tidb_global_task t where task_key = %?"+
Expand Down

0 comments on commit 86d027f

Please sign in to comment.