Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import into: change max pool size to num-cpu #44749

Merged
merged 2 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package importinto
import (
"context"
"encoding/json"
"runtime"
"sync"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -223,7 +224,7 @@ func init() {
logger.Info("create step scheduler")
return &taskMeta, logger, nil
}
scheduler.RegisterTaskType(proto.ImportInto)
scheduler.RegisterTaskType(proto.ImportInto, scheduler.WithPoolSize(int32(runtime.GOMAXPROCS(0))))
scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepImport,
func(taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) {
taskMeta, logger, err := prepareFn(taskID, bs, step)
Expand Down
4 changes: 2 additions & 2 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (e *LoadDataController) checkFieldParams() error {
}

func (p *Plan) initDefaultOptions() {
threadCnt := runtime.NumCPU()
threadCnt := runtime.GOMAXPROCS(0)
failpoint.Inject("mockNumCpu", func(val failpoint.Value) {
threadCnt = val.(int)
})
Expand Down Expand Up @@ -637,7 +637,7 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load

func (p *Plan) adjustOptions() {
// max value is cpu-count
numCPU := int64(runtime.NumCPU())
numCPU := int64(runtime.GOMAXPROCS(0))
if p.ThreadCnt > numCPU {
log.L().Info("IMPORT INTO thread count is larger than cpu-count, set to cpu-count")
p.ThreadCnt = numCPU
Expand Down
4 changes: 2 additions & 2 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestInitOptionsPositiveCase(t *testing.T) {
require.Equal(t, uint64(3), plan.IgnoreLines, sql)
require.Equal(t, config.ByteSize(100<<30), plan.DiskQuota, sql)
require.Equal(t, config.OpLevelOptional, plan.Checksum, sql)
require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs
require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs
require.Equal(t, config.ByteSize(200<<20), plan.MaxWriteSpeed, sql)
require.True(t, plan.SplitFile, sql)
require.Equal(t, int64(123), plan.MaxRecordedErrors, sql)
Expand All @@ -117,7 +117,7 @@ func TestAdjustOptions(t *testing.T) {
MaxWriteSpeed: 10,
}
plan.adjustOptions()
require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt)
require.Equal(t, int64(runtime.GOMAXPROCS(0)), plan.ThreadCnt)
require.Equal(t, config.ByteSize(10), plan.MaxWriteSpeed) // not adjusted
}

Expand Down