Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import/index: adjust concurrency using cpu-count of managed node depends on use case #50091

Merged
merged 10 commits into from
Jan 10, 2024
1 change: 0 additions & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/cpu",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/domainutil",
Expand Down
9 changes: 5 additions & 4 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
Expand Down Expand Up @@ -2118,9 +2117,11 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {

job := reorgInfo.Job
workerCntLimit := int(variable.GetDDLReorgWorkerCounter())
// we're using cpu count of current node, not of framework managed nodes,
// but it seems more intuitive.
concurrency := min(workerCntLimit, cpu.GetCPUCount())
cpuCount, err := handle.GetCPUCountOfManagedNode(ctx)
if err != nil {
return err
}
Comment on lines +2121 to +2123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retryable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it happens before create task, seems the new added errors should be checked manually: no managed node and no managed node have enough resource

concurrency := min(workerCntLimit, cpuCount)
logutil.BgLogger().Info("adjusted add-index task concurrency",
zap.Int("worker-cnt", workerCntLimit), zap.Int("task-concurrency", concurrency),
zap.String("task-key", taskKey))
Expand Down
9 changes: 9 additions & 0 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ func NotifyTaskChange() {
}
}

// GetCPUCountOfManagedNode gets the CPU count of the managed node.
func GetCPUCountOfManagedNode(ctx context.Context) (int, error) {
manager, err := storage.GetTaskManager()
if err != nil {
return 0, err
}
return manager.GetCPUCountOfManagedNode(ctx)
}

// SubmitTask submits a task.
func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, taskMeta []byte) (*proto.Task, error) {
taskManager, err := storage.GetTaskManager()
Expand Down
11 changes: 5 additions & 6 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,13 +827,12 @@ func TestDistFrameworkMeta(t *testing.T) {
_, sm, ctx := testutil.InitTableTest(t)

// when no node
_, err := storage.GetCPUCountOfManagedNodes(ctx, sm)
_, err := sm.GetCPUCountOfManagedNode(ctx)
require.ErrorContains(t, err, "no managed nodes")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(0)"))
require.NoError(t, sm.InitMeta(ctx, ":4000", "background"))
cpuCount, err := storage.GetCPUCountOfManagedNodes(ctx, sm)
require.NoError(t, err)
require.Equal(t, 0, cpuCount)
_, err = sm.GetCPUCountOfManagedNode(ctx)
require.ErrorContains(t, err, "no managed node have enough resource")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(100)"))
require.NoError(t, sm.InitMeta(ctx, ":4000", "background"))
Expand All @@ -860,7 +859,7 @@ func TestDistFrameworkMeta(t *testing.T) {
{ID: ":4002", Role: "", CPUCount: 100},
{ID: ":4003", Role: "background", CPUCount: 100},
}, nodes)
cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm)
cpuCount, err := sm.GetCPUCountOfManagedNode(ctx)
require.NoError(t, err)
require.Equal(t, 100, cpuCount)

Expand All @@ -887,7 +886,7 @@ func TestDistFrameworkMeta(t *testing.T) {
require.Equal(t, []proto.ManagedNode{
{ID: ":4001", Role: "", CPUCount: 8},
}, nodes)
cpuCount, err = storage.GetCPUCountOfManagedNodes(ctx, sm)
cpuCount, err = sm.GetCPUCountOfManagedNode(ctx)
require.NoError(t, err)
require.Equal(t, 8, cpuCount)

Expand Down
21 changes: 18 additions & 3 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (mgr *TaskManager) CreateTask(ctx context.Context, key string, tp proto.Tas

// CreateTaskWithSession adds a new task to task table with session.
func (mgr *TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error) {
cpuCount, err := mgr.getCPUCountOfManagedNodes(ctx, se)
cpuCount, err := mgr.getCPUCountOfManagedNode(ctx, se)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1257,8 +1257,20 @@ func (*TaskManager) getAllNodesWithSession(ctx context.Context, se sessionctx.Co
return nodes, nil
}

// getCPUCountOfManagedNodes gets the cpu count of managed nodes.
func (mgr *TaskManager) getCPUCountOfManagedNodes(ctx context.Context, se sessionctx.Context) (int, error) {
// GetCPUCountOfManagedNode gets the cpu count of managed node.
func (mgr *TaskManager) GetCPUCountOfManagedNode(ctx context.Context) (int, error) {
var cnt int
err := mgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
cnt, err2 = mgr.getCPUCountOfManagedNode(ctx, se)
return err2
})
return cnt, err
}

// getCPUCountOfManagedNode gets the cpu count of managed node.
// returns error when there's no managed node or no node has valid cpu count.
func (mgr *TaskManager) getCPUCountOfManagedNode(ctx context.Context, se sessionctx.Context) (int, error) {
nodes, err := mgr.getManagedNodesWithSession(ctx, se)
if err != nil {
return 0, err
Expand All @@ -1273,5 +1285,8 @@ func (mgr *TaskManager) getCPUCountOfManagedNodes(ctx context.Context, se sessio
break
}
}
if cpuCount == 0 {
return 0, errors.New("no managed node have enough resource for dist task")
}
return cpuCount, nil
}
12 changes: 0 additions & 12 deletions pkg/disttask/framework/storage/task_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package storage

import (
"context"
"testing"

"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit/testsetup"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand All @@ -39,16 +37,6 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, opts...)
}

func GetCPUCountOfManagedNodes(ctx context.Context, taskMgr *TaskManager) (int, error) {
var cnt int
err := taskMgr.WithNewSession(func(se sessionctx.Context) error {
var err2 error
cnt, err2 = taskMgr.getCPUCountOfManagedNodes(ctx, se)
return err2
})
return cnt, err
}

func TestSplitSubtasks(t *testing.T) {
tm := &TaskManager{}
subtasks := make([]*proto.Subtask, 0, 10)
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor,
pool := workerpool.NewWorkerPool(
"encodeAndSortOperator",
util.ImportInto,
int(executor.taskMeta.Plan.ThreadCnt),
executor.taskMeta.Plan.ThreadCnt,
func() workerpool.Worker[*importStepMinimalTask, workerpool.None] {
return newChunkWorker(ctx, op, indexMemorySizeLimit)
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/disttask/importinto/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err
SessionCtx: se,
TaskKey: TaskKey(jobID),
TaskType: proto.ImportInto,
ThreadCnt: int(plan.ThreadCnt),
ThreadCnt: plan.ThreadCnt,
}
p := planner.NewPlanner()
taskID, err2 = p.Run(planCtx, logicalPlan)
Expand All @@ -206,7 +206,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err
ti.logger = ti.logger.With(zap.Int64("task-id", task.ID))

ti.logger.Info("job submitted to task queue",
zap.Int64("job-id", jobID), zap.Int64("thread-cnt", plan.ThreadCnt))
zap.Int64("job-id", jobID), zap.Int("thread-cnt", plan.ThreadCnt))

return jobID, task, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S
1*size.MB,
8*1024,
onClose,
int(m.taskMeta.Plan.ThreadCnt),
m.taskMeta.Plan.ThreadCnt,
false)
}

Expand Down
27 changes: 16 additions & 11 deletions pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type ImportIntoExec struct {
controller *importer.LoadDataController
stmt string

plan *plannercore.ImportInto
tbl table.Table
dataFilled bool
}

Expand All @@ -73,22 +75,13 @@ var (

func newImportIntoExec(b exec.BaseExecutor, selectExec exec.Executor, userSctx sessionctx.Context,
plan *plannercore.ImportInto, tbl table.Table) (*ImportIntoExec, error) {
importPlan, err := importer.NewImportPlan(userSctx, plan, tbl)
if err != nil {
return nil, err
}
astArgs := importer.ASTArgsFromImportPlan(plan)
controller, err := importer.NewLoadDataController(importPlan, tbl, astArgs)
if err != nil {
return nil, err
}
return &ImportIntoExec{
BaseExecutor: b,
selectExec: selectExec,
userSctx: userSctx,
importPlan: importPlan,
controller: controller,
stmt: plan.Stmt,
plan: plan,
tbl: tbl,
}, nil
}

Expand All @@ -100,6 +93,18 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
// need to return an empty req to indicate all results have been written
return nil
}
importPlan, err := importer.NewImportPlan(ctx, e.userSctx, e.plan, e.tbl)
if err != nil {
return err
}
astArgs := importer.ASTArgsFromImportPlan(e.plan)
controller, err := importer.NewLoadDataController(importPlan, e.tbl, astArgs)
if err != nil {
return err
}
e.importPlan = importPlan
e.controller = controller

if err2 := e.controller.InitDataFiles(ctx); err2 != nil {
return err2
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//br/pkg/utils",
"//pkg/config",
"//pkg/ddl/util",
"//pkg/disttask/framework/handle",
"//pkg/expression",
"//pkg/keyspace",
"//pkg/kv",
Expand Down Expand Up @@ -85,15 +86,15 @@ go_test(
srcs = [
"chunk_process_testkit_test.go",
"import_test.go",
"importer_testkit_test.go",
"job_test.go",
"precheck_test.go",
"table_import_test.go",
"table_import_testkit_test.go",
],
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 21,
shard_count = 22,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/backend/encode",
Expand All @@ -107,6 +108,7 @@ go_test(
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//pkg/config",
"//pkg/disttask/framework/testutil",
"//pkg/expression",
"//pkg/infoschema",
"//pkg/kv",
Expand Down
Loading