From a73b20fd79dc25be6bd293f5dcae205717ffcb7f Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 4 Jan 2024 19:13:38 +0800 Subject: [PATCH 1/6] change --- pkg/disttask/framework/storage/table_test.go | 11 ++-- pkg/disttask/framework/storage/task_table.go | 15 +++++ .../framework/storage/task_table_test.go | 12 ---- .../importinto/encode_and_sort_operator.go | 2 +- pkg/disttask/importinto/job.go | 4 +- pkg/disttask/importinto/task_executor.go | 2 +- pkg/executor/importer/BUILD.bazel | 6 +- pkg/executor/importer/import.go | 61 ++++++++++++++----- pkg/executor/importer/import_test.go | 10 ++- ...stkit_test.go => importer_testkit_test.go} | 34 +++++++++++ pkg/executor/importer/table_import.go | 2 +- 11 files changed, 113 insertions(+), 46 deletions(-) rename pkg/executor/importer/{table_import_testkit_test.go => importer_testkit_test.go} (62%) diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 99ac86e6bc30a..27f4e58dc73e9 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -816,13 +816,12 @@ func TestDistFrameworkMeta(t *testing.T) { _, sm, ctx := testutil.InitTableTest(t) // when no node - _, err := storage.GetCPUCountOfManagedNodes(ctx, sm) + _, err := sm.GetCPUCountOfManagedNodes(ctx) require.ErrorContains(t, err, "no managed nodes") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(0)")) require.NoError(t, sm.StartManager(ctx, ":4000", "background")) - cpuCount, err := storage.GetCPUCountOfManagedNodes(ctx, sm) - require.NoError(t, err) - require.Equal(t, 0, cpuCount) + cpuCount, err := sm.GetCPUCountOfManagedNodes(ctx) + require.ErrorContains(t, err, "no managed node have enough resource") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)")) require.NoError(t, sm.StartManager(ctx, ":4000", "background")) @@ -850,7 +849,7 @@ func TestDistFrameworkMeta(t *testing.T) { {ID: ":4002", Role: "background", CPUCount: 100}, {ID: ":4003", Role: "background", CPUCount: 100}, }, nodes) - cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm) + cpuCount, err = sm.GetCPUCountOfManagedNodes(ctx) require.NoError(t, err) require.Equal(t, 100, cpuCount) @@ -875,7 +874,7 @@ func TestDistFrameworkMeta(t *testing.T) { require.Equal(t, []proto.ManagedNode{ {ID: ":4001", Role: "", CPUCount: 8}, }, nodes) - cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm) + cpuCount, err = sm.GetCPUCountOfManagedNodes(ctx) require.NoError(t, err) require.Equal(t, 8, cpuCount) } diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 31c64968a9595..216f0f9493cf3 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -1241,7 +1241,19 @@ func (*TaskManager) getAllNodesWithSession(ctx context.Context, se sessionctx.Co return nodes, nil } +// GetCPUCountOfManagedNodes gets the cpu count of managed nodes. +func (stm *TaskManager) GetCPUCountOfManagedNodes(ctx context.Context) (int, error) { + var cnt int + err := stm.WithNewSession(func(se sessionctx.Context) error { + var err2 error + cnt, err2 = stm.getCPUCountOfManagedNodes(ctx, se) + return err2 + }) + return cnt, err +} + // getCPUCountOfManagedNodes gets the cpu count of managed nodes. +// returns error when there's no managed node or no node has valid cpu count. func (stm *TaskManager) getCPUCountOfManagedNodes(ctx context.Context, se sessionctx.Context) (int, error) { nodes, err := stm.getManagedNodesWithSession(ctx, se) if err != nil { @@ -1257,5 +1269,8 @@ func (stm *TaskManager) getCPUCountOfManagedNodes(ctx context.Context, se sessio break } } + if cpuCount == 0 { + return 0, errors.New("no managed node have enough resource") + } return cpuCount, nil } diff --git a/pkg/disttask/framework/storage/task_table_test.go b/pkg/disttask/framework/storage/task_table_test.go index daccd37c0829b..dec57601a2af4 100644 --- a/pkg/disttask/framework/storage/task_table_test.go +++ b/pkg/disttask/framework/storage/task_table_test.go @@ -15,13 +15,11 @@ package storage import ( - "context" "testing" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/testkit/testsetup" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -39,16 +37,6 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, opts...) } -func GetCPUCountOfManagedNodes(ctx context.Context, taskMgr *TaskManager) (int, error) { - var cnt int - err := taskMgr.WithNewSession(func(se sessionctx.Context) error { - var err2 error - cnt, err2 = taskMgr.getCPUCountOfManagedNodes(ctx, se) - return err2 - }) - return cnt, err -} - func TestSplitSubtasks(t *testing.T) { tm := &TaskManager{} subtasks := make([]*proto.Subtask, 0, 10) diff --git a/pkg/disttask/importinto/encode_and_sort_operator.go b/pkg/disttask/importinto/encode_and_sort_operator.go index 651dfd0ac04a6..8bf03e911afa5 100644 --- a/pkg/disttask/importinto/encode_and_sort_operator.go +++ b/pkg/disttask/importinto/encode_and_sort_operator.go @@ -89,7 +89,7 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, pool := workerpool.NewWorkerPool( "encodeAndSortOperator", util.ImportInto, - int(executor.taskMeta.Plan.ThreadCnt), + executor.taskMeta.Plan.ThreadCnt, func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { return newChunkWorker(ctx, op, indexMemorySizeLimit) }, diff --git a/pkg/disttask/importinto/job.go b/pkg/disttask/importinto/job.go index 5e45e48cebe5b..6617aa1184e9c 100644 --- a/pkg/disttask/importinto/job.go +++ b/pkg/disttask/importinto/job.go @@ -182,7 +182,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err SessionCtx: se, TaskKey: TaskKey(jobID), TaskType: proto.ImportInto, - ThreadCnt: int(plan.ThreadCnt), + ThreadCnt: plan.ThreadCnt, } p := planner.NewPlanner() taskID, err2 = p.Run(planCtx, logicalPlan) @@ -206,7 +206,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err ti.logger = ti.logger.With(zap.Int64("task-id", task.ID)) ti.logger.Info("job submitted to task queue", - zap.Int64("job-id", jobID), zap.Int64("thread-cnt", plan.ThreadCnt)) + zap.Int64("job-id", jobID), zap.Int("thread-cnt", plan.ThreadCnt)) return jobID, task, nil } diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index e50334b88f9d2..2f5e7f4420b0c 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -323,7 +323,7 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S 1*size.MB, 8*1024, onClose, - int(m.taskMeta.Plan.ThreadCnt), + m.taskMeta.Plan.ThreadCnt, false) } diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 73436f97292e9..f10c5631971b9 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//br/pkg/utils", "//pkg/config", "//pkg/ddl/util", + "//pkg/disttask/framework/storage", "//pkg/expression", "//pkg/keyspace", "//pkg/kv", @@ -85,15 +86,15 @@ go_test( srcs = [ "chunk_process_testkit_test.go", "import_test.go", + "importer_testkit_test.go", "job_test.go", "precheck_test.go", "table_import_test.go", - "table_import_testkit_test.go", ], embed = [":importer"], flaky = True, race = "on", - shard_count = 21, + shard_count = 22, deps = [ "//br/pkg/errors", "//br/pkg/lightning/backend/encode", @@ -107,6 +108,7 @@ go_test( "//br/pkg/streamhelper", "//br/pkg/utils", "//pkg/config", + "//pkg/disttask/framework/testutil", "//pkg/expression", "//pkg/infoschema", "//pkg/kv", diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index df41fc850b91c..09ad0a9a00709 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -25,6 +25,7 @@ import ( "slices" "strings" "sync" + "time" "unicode/utf8" "github.com/pingcap/errors" @@ -37,6 +38,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" tidb "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/util" + disttaskstore "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/expression" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" @@ -59,6 +61,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/stringutil" kvconfig "github.com/tikv/client-go/v2/config" + tikvutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -141,6 +144,8 @@ var ( ".zstd", ".zst", ".snappy", } + + getCPUCountTimeout = time.Minute ) // GetKVStore returns a kv.Storage. @@ -199,7 +204,7 @@ type Plan struct { DiskQuota config.ByteSize Checksum config.PostOpLevel - ThreadCnt int64 + ThreadCnt int MaxWriteSpeed config.ByteSize SplitFile bool MaxRecordedErrors int64 @@ -499,14 +504,11 @@ func (e *LoadDataController) checkFieldParams() error { return nil } -func (p *Plan) initDefaultOptions() { - // we're using cpu count of current node, not of framework managed nodes, - // but it seems more intuitive. - threadCnt := cpu.GetCPUCount() - threadCnt = int(math.Max(1, float64(threadCnt)*0.5)) +func (p *Plan) initDefaultOptions(targetNodeCPUCnt int) { + threadCnt := int(math.Max(1, float64(targetNodeCPUCnt)*0.5)) p.Checksum = config.OpLevelRequired - p.ThreadCnt = int64(threadCnt) + p.ThreadCnt = threadCnt p.MaxWriteSpeed = unlimitedWriteSpeed p.SplitFile = false p.MaxRecordedErrors = 100 @@ -520,7 +522,11 @@ func (p *Plan) initDefaultOptions() { } func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.LoadDataOpt) error { - p.initDefaultOptions() + targetNodeCPUCnt, err := GetTargetNodeCPUCnt(p.Path) + if err != nil { + return err + } + p.initDefaultOptions(targetNodeCPUCnt) specifiedOptions := map[string]*plannercore.LoadDataOpt{} for _, opt := range options { @@ -637,7 +643,7 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load if err != nil || vInt <= 0 { return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } - p.ThreadCnt = vInt + p.ThreadCnt = int(vInt) } if opt, ok := specifiedOptions[maxWriteSpeedOption]; ok { v, err := optAsString(opt) @@ -708,16 +714,15 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load return exeerrors.ErrInvalidOptionVal.FastGenByArgs("skip_rows, should be <= 1 when split-file is enabled") } - p.adjustOptions() + p.adjustOptions(targetNodeCPUCnt) return nil } -func (p *Plan) adjustOptions() { +func (p *Plan) adjustOptions(targetNodeCPUCnt int) { // max value is cpu-count - numCPU := int64(runtime.GOMAXPROCS(0)) - if p.ThreadCnt > numCPU { + if p.ThreadCnt > targetNodeCPUCnt { log.L().Info("IMPORT INTO thread count is larger than cpu-count, set to cpu-count") - p.ThreadCnt = numCPU + p.ThreadCnt = targetNodeCPUCnt } } @@ -1252,7 +1257,7 @@ func (e *LoadDataController) getBackendWorkerConcurrency() int { // The real concurrency used is adjusted in external engine later. // when using local sort, use the default value as lightning. if e.IsGlobalSort() { - return int(e.ThreadCnt) * 2 + return e.ThreadCnt * 2 } return config.DefaultRangeConcurrency * 2 } @@ -1335,5 +1340,31 @@ func GetMsgFromBRError(err error) string { return raw[:len(raw)-len(berrMsg)-len(": ")] } +// GetTargetNodeCPUCnt get cpu count of target node where the import into job will be executed. +// target node is current node if it's server-disk import or disttask is disabled, +// else it's the node managed by disttask. +// exported for testing. +func GetTargetNodeCPUCnt(path string) (int, error) { + u, err2 := storage.ParseRawURL(path) + if err2 != nil { + return 0, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, + err2.Error()) + } + + serverDiskImport := storage.IsLocal(u) + if serverDiskImport || !variable.EnableDistTask.Load() { + return cpu.GetCPUCount(), nil + } + manager, err := disttaskstore.GetTaskManager() + if err != nil { + return 0, err + } + // the call path of initialization of threadCnt don't have context, so we use a timeout here. + ctx, cancel := context.WithTimeout(context.Background(), getCPUCountTimeout) + defer cancel() + ctx = tikvutil.WithInternalSourceType(ctx, tidbkv.InternalImportInto) + return manager.GetCPUCountOfManagedNodes(ctx) +} + // TestSyncCh is used in unit test to synchronize the execution. var TestSyncCh = make(chan struct{}) diff --git a/pkg/executor/importer/import_test.go b/pkg/executor/importer/import_test.go index db25955c080aa..0dce9a0f60874 100644 --- a/pkg/executor/importer/import_test.go +++ b/pkg/executor/importer/import_test.go @@ -47,12 +47,11 @@ import ( func TestInitDefaultOptions(t *testing.T) { plan := &Plan{} - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(1)")) variable.CloudStorageURI.Store("s3://bucket/path") t.Cleanup(func() { variable.CloudStorageURI.Store("") }) - plan.initDefaultOptions() + plan.initDefaultOptions(1) require.Equal(t, config.ByteSize(0), plan.DiskQuota) require.Equal(t, config.OpLevelRequired, plan.Checksum) require.Equal(t, int64(1), plan.ThreadCnt) @@ -65,8 +64,7 @@ func TestInitDefaultOptions(t *testing.T) { require.Equal(t, config.ByteSize(defaultMaxEngineSize), plan.MaxEngineSize) require.Equal(t, "s3://bucket/path", plan.CloudStorageURI) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(10)")) - plan.initDefaultOptions() + plan.initDefaultOptions(10) require.Equal(t, int64(5), plan.ThreadCnt) } @@ -173,8 +171,8 @@ func TestAdjustOptions(t *testing.T) { ThreadCnt: 100000000, MaxWriteSpeed: 10, } - plan.adjustOptions() - require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt) + plan.adjustOptions(16) + require.Equal(t, 16, plan.ThreadCnt) require.Equal(t, config.ByteSize(10), plan.MaxWriteSpeed) // not adjusted } diff --git a/pkg/executor/importer/table_import_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go similarity index 62% rename from pkg/executor/importer/table_import_testkit_test.go rename to pkg/executor/importer/importer_testkit_test.go index 570d0737fb94f..33abc9a60322b 100644 --- a/pkg/executor/importer/table_import_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -22,9 +22,12 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/failpoint" verify "github.com/pingcap/tidb/br/pkg/lightning/verification" + "github.com/pingcap/tidb/pkg/disttask/framework/testutil" "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/require" ) @@ -65,3 +68,34 @@ func TestChecksumTable(t *testing.T) { require.NoError(t, err) require.True(t, remoteChecksum.IsEqual(&localChecksum)) } + +func TestGetTargetNodeCpuCnt(t *testing.T) { + _, tm, ctx := testutil.InitTableTest(t) + require.False(t, variable.EnableDistTask.Load()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu")) + variable.EnableDistTask.Store(false) + }) + require.NoError(t, tm.StartManager(ctx, "tidb1", "")) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")) + // invalid path + _, err := importer.GetTargetNodeCPUCnt(":xx") + require.ErrorIs(t, err, exeerrors.ErrLoadDataInvalidURI) + // server disk import + targetNodeCPUCnt, err := importer.GetTargetNodeCPUCnt("/path/to/xxx.csv") + require.NoError(t, err) + require.Equal(t, 8, targetNodeCPUCnt) + // disttask disabled + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt("s3://path/to/xxx.csv") + require.NoError(t, err) + require.Equal(t, 8, targetNodeCPUCnt) + // disttask enabled + variable.EnableDistTask.Store(true) + + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt("s3://path/to/xxx.csv") + require.NoError(t, err) + require.Equal(t, 16, targetNodeCPUCnt) +} diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 91844ceacb28e..2c3259febfc7b 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -382,7 +382,7 @@ func (e *LoadDataController) PopulateChunks(ctx context.Context) (ecp map[int32] ColumnCnt: len(e.Table.Meta().Columns), EngineDataSize: adjustedMaxEngineSize, MaxChunkSize: int64(config.MaxRegionSize), - Concurrency: int(e.ThreadCnt), + Concurrency: e.ThreadCnt, IOWorkers: nil, Store: e.dataStore, TableMeta: tableMeta, From 3b4fe56d7b314c08265851d255850306050d4ebe Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 4 Jan 2024 19:26:15 +0800 Subject: [PATCH 2/6] change --- pkg/ddl/BUILD.bazel | 1 - pkg/ddl/index.go | 9 +++++---- pkg/disttask/framework/handle/handle.go | 9 +++++++++ pkg/disttask/framework/storage/table_test.go | 8 ++++---- pkg/disttask/framework/storage/task_table.go | 12 ++++++------ pkg/executor/importer/BUILD.bazel | 2 +- pkg/executor/importer/import.go | 8 ++------ 7 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 0aa8b3638917f..417058edf04f6 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -129,7 +129,6 @@ go_library( "//pkg/util/chunk", "//pkg/util/codec", "//pkg/util/collate", - "//pkg/util/cpu", "//pkg/util/dbterror", "//pkg/util/dbterror/exeerrors", "//pkg/util/domainutil", diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 6f9e53a0c20f4..a8e3d1d3403a3 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -59,7 +59,6 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/cpu" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/logutil" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" @@ -2118,9 +2117,11 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error { job := reorgInfo.Job workerCntLimit := int(variable.GetDDLReorgWorkerCounter()) - // we're using cpu count of current node, not of framework managed nodes, - // but it seems more intuitive. - concurrency := min(workerCntLimit, cpu.GetCPUCount()) + cpuCount, err := handle.GetCPUCountOfManagedNode(ctx) + if err != nil { + return err + } + concurrency := min(workerCntLimit, cpuCount) logutil.BgLogger().Info("adjusted add-index task concurrency", zap.Int("worker-cnt", workerCntLimit), zap.Int("task-concurrency", concurrency), zap.String("task-key", taskKey)) diff --git a/pkg/disttask/framework/handle/handle.go b/pkg/disttask/framework/handle/handle.go index ecac8fb8a92fd..b774fd7aa01bf 100644 --- a/pkg/disttask/framework/handle/handle.go +++ b/pkg/disttask/framework/handle/handle.go @@ -45,6 +45,15 @@ func NotifyTaskChange() { } } +// GetCPUCountOfManagedNode gets the CPU count of the managed node. +func GetCPUCountOfManagedNode(ctx context.Context) (int, error) { + manager, err := storage.GetTaskManager() + if err != nil { + return 0, err + } + return manager.GetCPUCountOfManagedNode(ctx) +} + // SubmitTask submits a task. func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, taskMeta []byte) (*proto.Task, error) { taskManager, err := storage.GetTaskManager() diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 27f4e58dc73e9..2b9bc04bfe82f 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -816,11 +816,11 @@ func TestDistFrameworkMeta(t *testing.T) { _, sm, ctx := testutil.InitTableTest(t) // when no node - _, err := sm.GetCPUCountOfManagedNodes(ctx) + _, err := sm.GetCPUCountOfManagedNode(ctx) require.ErrorContains(t, err, "no managed nodes") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(0)")) require.NoError(t, sm.StartManager(ctx, ":4000", "background")) - cpuCount, err := sm.GetCPUCountOfManagedNodes(ctx) + cpuCount, err := sm.GetCPUCountOfManagedNode(ctx) require.ErrorContains(t, err, "no managed node have enough resource") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)")) @@ -849,7 +849,7 @@ func TestDistFrameworkMeta(t *testing.T) { {ID: ":4002", Role: "background", CPUCount: 100}, {ID: ":4003", Role: "background", CPUCount: 100}, }, nodes) - cpuCount, err = sm.GetCPUCountOfManagedNodes(ctx) + cpuCount, err = sm.GetCPUCountOfManagedNode(ctx) require.NoError(t, err) require.Equal(t, 100, cpuCount) @@ -874,7 +874,7 @@ func TestDistFrameworkMeta(t *testing.T) { require.Equal(t, []proto.ManagedNode{ {ID: ":4001", Role: "", CPUCount: 8}, }, nodes) - cpuCount, err = sm.GetCPUCountOfManagedNodes(ctx) + cpuCount, err = sm.GetCPUCountOfManagedNode(ctx) require.NoError(t, err) require.Equal(t, 8, cpuCount) } diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 216f0f9493cf3..64f420c8ab2fc 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -231,7 +231,7 @@ func (stm *TaskManager) CreateTask(ctx context.Context, key string, tp proto.Tas // CreateTaskWithSession adds a new task to task table with session. func (stm *TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error) { - cpuCount, err := stm.getCPUCountOfManagedNodes(ctx, se) + cpuCount, err := stm.getCPUCountOfManagedNode(ctx, se) if err != nil { return 0, err } @@ -1241,20 +1241,20 @@ func (*TaskManager) getAllNodesWithSession(ctx context.Context, se sessionctx.Co return nodes, nil } -// GetCPUCountOfManagedNodes gets the cpu count of managed nodes. -func (stm *TaskManager) GetCPUCountOfManagedNodes(ctx context.Context) (int, error) { +// GetCPUCountOfManagedNode gets the cpu count of managed node. +func (stm *TaskManager) GetCPUCountOfManagedNode(ctx context.Context) (int, error) { var cnt int err := stm.WithNewSession(func(se sessionctx.Context) error { var err2 error - cnt, err2 = stm.getCPUCountOfManagedNodes(ctx, se) + cnt, err2 = stm.getCPUCountOfManagedNode(ctx, se) return err2 }) return cnt, err } -// getCPUCountOfManagedNodes gets the cpu count of managed nodes. +// getCPUCountOfManagedNode gets the cpu count of managed node. // returns error when there's no managed node or no node has valid cpu count. -func (stm *TaskManager) getCPUCountOfManagedNodes(ctx context.Context, se sessionctx.Context) (int, error) { +func (stm *TaskManager) getCPUCountOfManagedNode(ctx context.Context, se sessionctx.Context) (int, error) { nodes, err := stm.getManagedNodesWithSession(ctx, se) if err != nil { return 0, err diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index f10c5631971b9..33cba79b31ef4 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -32,7 +32,7 @@ go_library( "//br/pkg/utils", "//pkg/config", "//pkg/ddl/util", - "//pkg/disttask/framework/storage", + "//pkg/disttask/framework/handle", "//pkg/expression", "//pkg/keyspace", "//pkg/kv", diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 09ad0a9a00709..e863cf573302c 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -38,7 +38,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" tidb "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/util" - disttaskstore "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/expression" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" @@ -1355,15 +1355,11 @@ func GetTargetNodeCPUCnt(path string) (int, error) { if serverDiskImport || !variable.EnableDistTask.Load() { return cpu.GetCPUCount(), nil } - manager, err := disttaskstore.GetTaskManager() - if err != nil { - return 0, err - } // the call path of initialization of threadCnt don't have context, so we use a timeout here. ctx, cancel := context.WithTimeout(context.Background(), getCPUCountTimeout) defer cancel() ctx = tikvutil.WithInternalSourceType(ctx, tidbkv.InternalImportInto) - return manager.GetCPUCountOfManagedNodes(ctx) + return handle.GetCPUCountOfManagedNode(ctx) } // TestSyncCh is used in unit test to synchronize the execution. From 058bb613a828b1ae87847fe412e3010f51f5af9f Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 4 Jan 2024 21:32:52 +0800 Subject: [PATCH 3/6] change --- pkg/disttask/framework/storage/table_test.go | 4 +-- pkg/executor/import_into.go | 27 +++++++++++-------- pkg/executor/importer/import.go | 15 ++++------- pkg/executor/importer/import_test.go | 25 +++++++++-------- .../importer/importer_testkit_test.go | 8 +++--- 5 files changed, 41 insertions(+), 38 deletions(-) diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 2b9bc04bfe82f..41b47238b2076 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -820,7 +820,7 @@ func TestDistFrameworkMeta(t *testing.T) { require.ErrorContains(t, err, "no managed nodes") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(0)")) require.NoError(t, sm.StartManager(ctx, ":4000", "background")) - cpuCount, err := sm.GetCPUCountOfManagedNode(ctx) + _, err = sm.GetCPUCountOfManagedNode(ctx) require.ErrorContains(t, err, "no managed node have enough resource") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)")) @@ -849,7 +849,7 @@ func TestDistFrameworkMeta(t *testing.T) { {ID: ":4002", Role: "background", CPUCount: 100}, {ID: ":4003", Role: "background", CPUCount: 100}, }, nodes) - cpuCount, err = sm.GetCPUCountOfManagedNode(ctx) + cpuCount, err := sm.GetCPUCountOfManagedNode(ctx) require.NoError(t, err) require.Equal(t, 100, cpuCount) diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index 7711ac5f9e665..60844e37a9100 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -63,6 +63,8 @@ type ImportIntoExec struct { controller *importer.LoadDataController stmt string + plan *plannercore.ImportInto + tbl table.Table dataFilled bool } @@ -72,21 +74,12 @@ var ( func newImportIntoExec(b exec.BaseExecutor, userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) ( *ImportIntoExec, error) { - importPlan, err := importer.NewImportPlan(userSctx, plan, tbl) - if err != nil { - return nil, err - } - astArgs := importer.ASTArgsFromImportPlan(plan) - controller, err := importer.NewLoadDataController(importPlan, tbl, astArgs) - if err != nil { - return nil, err - } return &ImportIntoExec{ BaseExecutor: b, userSctx: userSctx, - importPlan: importPlan, - controller: controller, stmt: plan.Stmt, + plan: plan, + tbl: tbl, }, nil } @@ -98,6 +91,18 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) // need to return an empty req to indicate all results have been written return nil } + importPlan, err := importer.NewImportPlan(ctx, e.userSctx, e.plan, e.tbl) + if err != nil { + return err + } + astArgs := importer.ASTArgsFromImportPlan(e.plan) + controller, err := importer.NewLoadDataController(importPlan, e.tbl, astArgs) + if err != nil { + return err + } + e.importPlan = importPlan + e.controller = controller + if err2 := e.controller.InitDataFiles(ctx); err2 != nil { return err2 } diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index e863cf573302c..c6a0f440cde0b 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -61,7 +61,6 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/stringutil" kvconfig "github.com/tikv/client-go/v2/config" - tikvutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -352,7 +351,7 @@ func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.Load } // NewImportPlan creates a new import into plan. -func NewImportPlan(userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (*Plan, error) { +func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (*Plan, error) { var format string if plan.Format != nil { format = strings.ToLower(*plan.Format) @@ -391,7 +390,7 @@ func NewImportPlan(userSctx sessionctx.Context, plan *plannercore.ImportInto, tb InImportInto: true, User: userSctx.GetSessionVars().User.String(), } - if err := p.initOptions(userSctx, plan.Options); err != nil { + if err := p.initOptions(ctx, userSctx, plan.Options); err != nil { return nil, err } if err := p.initParameters(plan); err != nil { @@ -521,8 +520,8 @@ func (p *Plan) initDefaultOptions(targetNodeCPUCnt int) { p.Charset = &v } -func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.LoadDataOpt) error { - targetNodeCPUCnt, err := GetTargetNodeCPUCnt(p.Path) +func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, options []*plannercore.LoadDataOpt) error { + targetNodeCPUCnt, err := GetTargetNodeCPUCnt(ctx, p.Path) if err != nil { return err } @@ -1344,7 +1343,7 @@ func GetMsgFromBRError(err error) string { // target node is current node if it's server-disk import or disttask is disabled, // else it's the node managed by disttask. // exported for testing. -func GetTargetNodeCPUCnt(path string) (int, error) { +func GetTargetNodeCPUCnt(ctx context.Context, path string) (int, error) { u, err2 := storage.ParseRawURL(path) if err2 != nil { return 0, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource, @@ -1355,10 +1354,6 @@ func GetTargetNodeCPUCnt(path string) (int, error) { if serverDiskImport || !variable.EnableDistTask.Load() { return cpu.GetCPUCount(), nil } - // the call path of initialization of threadCnt don't have context, so we use a timeout here. - ctx, cancel := context.WithTimeout(context.Background(), getCPUCountTimeout) - defer cancel() - ctx = tikvutil.WithInternalSourceType(ctx, tidbkv.InternalImportInto) return handle.GetCPUCountOfManagedNode(ctx) } diff --git a/pkg/executor/importer/import_test.go b/pkg/executor/importer/import_test.go index 0dce9a0f60874..2d88ed82f73ab 100644 --- a/pkg/executor/importer/import_test.go +++ b/pkg/executor/importer/import_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/pkg/expression" + tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" plannercore "github.com/pingcap/tidb/pkg/planner/core" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" + tikvutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -54,7 +56,7 @@ func TestInitDefaultOptions(t *testing.T) { plan.initDefaultOptions(1) require.Equal(t, config.ByteSize(0), plan.DiskQuota) require.Equal(t, config.OpLevelRequired, plan.Checksum) - require.Equal(t, int64(1), plan.ThreadCnt) + require.Equal(t, 1, plan.ThreadCnt) require.Equal(t, unlimitedWriteSpeed, plan.MaxWriteSpeed) require.Equal(t, false, plan.SplitFile) require.Equal(t, int64(100), plan.MaxRecordedErrors) @@ -65,13 +67,14 @@ func TestInitDefaultOptions(t *testing.T) { require.Equal(t, "s3://bucket/path", plan.CloudStorageURI) plan.initDefaultOptions(10) - require.Equal(t, int64(5), plan.ThreadCnt) + require.Equal(t, 5, plan.ThreadCnt) } // for negative case see TestImportIntoOptionsNegativeCase func TestInitOptionsPositiveCase(t *testing.T) { - ctx := mock.NewContext() - defer ctx.Close() + sctx := mock.NewContext() + defer sctx.Close() + ctx := tikvutil.WithInternalSourceType(context.Background(), tidbkv.InternalImportInto) convertOptions := func(inOptions []*ast.LoadDataOpt) []*plannercore.LoadDataOpt { options := []*plannercore.LoadDataOpt{} @@ -79,7 +82,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { for _, opt := range inOptions { loadDataOpt := plannercore.LoadDataOpt{Name: opt.Name} if opt.Value != nil { - loadDataOpt.Value, err = expression.RewriteSimpleExprWithNames(ctx, opt.Value, nil, nil) + loadDataOpt.Value, err = expression.RewriteSimpleExprWithNames(sctx, opt.Value, nil, nil) require.NoError(t, err) } options = append(options, &loadDataOpt) @@ -109,7 +112,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { stmt, err := p.ParseOneStmt(sql, "", "") require.NoError(t, err, sql) plan := &Plan{Format: DataFormatCSV} - err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + err = plan.initOptions(ctx, sctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) require.NoError(t, err, sql) require.Equal(t, "utf8", *plan.Charset, sql) require.Equal(t, "aaa", plan.FieldsTerminatedBy, sql) @@ -120,7 +123,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { require.Equal(t, uint64(1), plan.IgnoreLines, sql) require.Equal(t, config.ByteSize(100<<30), plan.DiskQuota, sql) require.Equal(t, config.OpLevelOptional, plan.Checksum, sql) - require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs + require.Equal(t, runtime.GOMAXPROCS(0), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs require.Equal(t, config.ByteSize(200<<20), plan.MaxWriteSpeed, sql) require.True(t, plan.SplitFile, sql) require.Equal(t, int64(123), plan.MaxRecordedErrors, sql) @@ -135,7 +138,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { variable.CloudStorageURI.Store("") }) plan = &Plan{Format: DataFormatCSV} - err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + err = plan.initOptions(ctx, sctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) require.NoError(t, err, sql) require.Equal(t, "s3://bucket/path", plan.CloudStorageURI, sql) @@ -144,7 +147,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { stmt, err = p.ParseOneStmt(sql2, "", "") require.NoError(t, err, sql2) plan = &Plan{Format: DataFormatCSV} - err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + err = plan.initOptions(ctx, sctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) require.NoError(t, err, sql2) require.Equal(t, "s3://bucket/path2", plan.CloudStorageURI, sql2) // override with gs @@ -152,7 +155,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { stmt, err = p.ParseOneStmt(sql3, "", "") require.NoError(t, err, sql3) plan = &Plan{Format: DataFormatCSV} - err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + err = plan.initOptions(ctx, sctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) require.NoError(t, err, sql3) require.Equal(t, "gs://bucket/path2", plan.CloudStorageURI, sql3) // override with empty string, force use local sort @@ -160,7 +163,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { stmt, err = p.ParseOneStmt(sql4, "", "") require.NoError(t, err, sql4) plan = &Plan{Format: DataFormatCSV} - err = plan.initOptions(ctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) + err = plan.initOptions(ctx, sctx, convertOptions(stmt.(*ast.ImportIntoStmt).Options)) require.NoError(t, err, sql4) require.Equal(t, "", plan.CloudStorageURI, sql4) } diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index 33abc9a60322b..5e156e920617a 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -82,20 +82,20 @@ func TestGetTargetNodeCpuCnt(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")) // invalid path - _, err := importer.GetTargetNodeCPUCnt(":xx") + _, err := importer.GetTargetNodeCPUCnt(ctx, ":xx") require.ErrorIs(t, err, exeerrors.ErrLoadDataInvalidURI) // server disk import - targetNodeCPUCnt, err := importer.GetTargetNodeCPUCnt("/path/to/xxx.csv") + targetNodeCPUCnt, err := importer.GetTargetNodeCPUCnt(ctx, "/path/to/xxx.csv") require.NoError(t, err) require.Equal(t, 8, targetNodeCPUCnt) // disttask disabled - targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt("s3://path/to/xxx.csv") + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, "s3://path/to/xxx.csv") require.NoError(t, err) require.Equal(t, 8, targetNodeCPUCnt) // disttask enabled variable.EnableDistTask.Store(true) - targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt("s3://path/to/xxx.csv") + targetNodeCPUCnt, err = importer.GetTargetNodeCPUCnt(ctx, "s3://path/to/xxx.csv") require.NoError(t, err) require.Equal(t, 16, targetNodeCPUCnt) } From 71b53cf34ffc12eb0111e120f1506fdb4ac3f73d Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 9 Jan 2024 14:01:53 +0800 Subject: [PATCH 4/6] Update pkg/disttask/framework/storage/task_table.go Co-authored-by: EasonBall <592838129@qq.com> --- pkg/disttask/framework/storage/task_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 64f420c8ab2fc..7a14fb46e47f5 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -1270,7 +1270,7 @@ func (stm *TaskManager) getCPUCountOfManagedNode(ctx context.Context, se session } } if cpuCount == 0 { - return 0, errors.New("no managed node have enough resource") + return 0, errors.New("no managed node have enough resource for dist task") } return cpuCount, nil } From 8886cfcf8d54183c958aeb94b9e5ab496b1676e3 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 9 Jan 2024 14:30:09 +0800 Subject: [PATCH 5/6] fix comment --- pkg/executor/importer/import.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index c6a0f440cde0b..ea282de3b6b8d 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -25,7 +25,6 @@ import ( "slices" "strings" "sync" - "time" "unicode/utf8" "github.com/pingcap/errors" @@ -143,8 +142,6 @@ var ( ".zstd", ".zst", ".snappy", } - - getCPUCountTimeout = time.Minute ) // GetKVStore returns a kv.Storage. From 09e0050e9905d059e29af1e44ea187ac05c2d9fe Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 9 Jan 2024 14:55:33 +0800 Subject: [PATCH 6/6] fix conflict code --- pkg/executor/importer/importer_testkit_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index 5e156e920617a..fbc982d6789ab 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -78,7 +78,7 @@ func TestGetTargetNodeCpuCnt(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu")) variable.EnableDistTask.Store(false) }) - require.NoError(t, tm.StartManager(ctx, "tidb1", "")) + require.NoError(t, tm.InitMeta(ctx, "tidb1", "")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")) // invalid path