diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index 3badbfc378b9a..d8308c0eb35ed 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -17,6 +17,7 @@ package importinto import ( "context" "encoding/json" + "runtime" "sync" "github.com/pingcap/errors" @@ -223,7 +224,7 @@ func init() { logger.Info("create step scheduler") return &taskMeta, logger, nil } - scheduler.RegisterTaskType(proto.ImportInto) + scheduler.RegisterTaskType(proto.ImportInto, scheduler.WithPoolSize(int32(runtime.GOMAXPROCS(0)))) scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepImport, func(taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) { taskMeta, logger, err := prepareFn(taskID, bs, step) diff --git a/executor/importer/import.go b/executor/importer/import.go index 1e96c7cad71c3..024684dce9f56 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -460,7 +460,7 @@ func (e *LoadDataController) checkFieldParams() error { } func (p *Plan) initDefaultOptions() { - threadCnt := runtime.NumCPU() + threadCnt := runtime.GOMAXPROCS(0) failpoint.Inject("mockNumCpu", func(val failpoint.Value) { threadCnt = val.(int) }) @@ -637,7 +637,7 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load func (p *Plan) adjustOptions() { // max value is cpu-count - numCPU := int64(runtime.NumCPU()) + numCPU := int64(runtime.GOMAXPROCS(0)) if p.ThreadCnt > numCPU { log.L().Info("IMPORT INTO thread count is larger than cpu-count, set to cpu-count") p.ThreadCnt = numCPU diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index 4b76555681dcd..115c620ae2fb5 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -102,7 +102,7 @@ func TestInitOptionsPositiveCase(t *testing.T) { require.Equal(t, uint64(3), plan.IgnoreLines, sql) require.Equal(t, config.ByteSize(100<<30), plan.DiskQuota, sql) require.Equal(t, config.OpLevelOptional, plan.Checksum, sql) - require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs + require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs require.Equal(t, config.ByteSize(200<<20), plan.MaxWriteSpeed, sql) require.True(t, plan.SplitFile, sql) require.Equal(t, int64(123), plan.MaxRecordedErrors, sql) @@ -117,7 +117,7 @@ func TestAdjustOptions(t *testing.T) { MaxWriteSpeed: 10, } plan.adjustOptions() - require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt) + require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt) require.Equal(t, config.ByteSize(10), plan.MaxWriteSpeed) // not adjusted }