Skip to content

Commit

Permalink
loaddata: support concurrent writing to TiKV (#42667)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 committed Mar 31, 2023
1 parent 4e30a64 commit 31a3b2c
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 130 deletions.
1 change: 1 addition & 0 deletions executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_test(
srcs = ["import_test.go"],
embed = [":importer"],
flaky = True,
race = "on",
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/config",
Expand Down
23 changes: 15 additions & 8 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type LoadDataController struct {
checksum config.PostOpLevel
addIndex bool
analyze config.PostOpLevel
threadCnt int64
ThreadCnt int64
BatchSize int64
maxWriteSpeed config.ByteSize // per second
splitFile bool
Expand Down Expand Up @@ -322,8 +322,13 @@ func (e *LoadDataController) initFieldParams(plan *plannercore.LoadData) error {
return nil
}

var ignoreInTest = false

func (e *LoadDataController) initDefaultOptions() {
threadCnt := runtime.NumCPU()
if intest.InTest && !ignoreInTest {
threadCnt = 1
}
if e.Format == LoadDataFormatParquet {
threadCnt = int(math.Max(1, float64(threadCnt)*0.75))
}
Expand All @@ -333,7 +338,7 @@ func (e *LoadDataController) initDefaultOptions() {
e.checksum = config.OpLevelRequired
e.addIndex = true
e.analyze = config.OpLevelOptional
e.threadCnt = int64(threadCnt)
e.ThreadCnt = int64(threadCnt)
e.BatchSize = 1000
e.maxWriteSpeed = unlimitedWriteSpeed
e.splitFile = false
Expand Down Expand Up @@ -424,8 +429,8 @@ func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*pl
}
if opt, ok := specifiedOptions[threadOption]; ok {
// boolean true will be taken as 1
e.threadCnt, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.threadCnt <= 0 {
e.ThreadCnt, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.ThreadCnt <= 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
Expand Down Expand Up @@ -476,8 +481,8 @@ func (e *LoadDataController) adjustOptions() {
}
// max value is cpu-count
numCPU := int64(runtime.NumCPU())
if e.threadCnt > numCPU {
e.threadCnt = numCPU
if e.ThreadCnt > numCPU {
e.ThreadCnt = numCPU
}
if e.maxWriteSpeed < minWriteSpeed {
e.maxWriteSpeed = minWriteSpeed
Expand Down Expand Up @@ -714,8 +719,10 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo {
}

// GetParser returns a parser for the data file.
func (e *LoadDataController) GetParser(ctx context.Context, dataFileInfo LoadDataReaderInfo) (
parser mydump.Parser, err error) {
func (e *LoadDataController) GetParser(
ctx context.Context,
dataFileInfo LoadDataReaderInfo,
) (parser mydump.Parser, err error) {
reader, err2 := dataFileInfo.Opener(ctx)
if err2 != nil {
return nil, err2
Expand Down
17 changes: 11 additions & 6 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ import (
)

func TestInitDefaultOptions(t *testing.T) {
ignoreInTest = true
t.Cleanup(func() {
ignoreInTest = false
})

e := LoadDataController{}
e.initDefaultOptions()
require.Equal(t, LogicalImportMode, e.ImportMode)
require.Equal(t, config.ByteSize(50<<30), e.diskQuota)
require.Equal(t, config.OpLevelRequired, e.checksum)
require.Equal(t, true, e.addIndex)
require.Equal(t, config.OpLevelOptional, e.analyze)
require.Equal(t, int64(runtime.NumCPU()), e.threadCnt)
require.Equal(t, int64(runtime.NumCPU()), e.ThreadCnt)
require.Equal(t, int64(1000), e.BatchSize)
require.Equal(t, unlimitedWriteSpeed, e.maxWriteSpeed)
require.Equal(t, false, e.splitFile)
Expand All @@ -49,8 +54,8 @@ func TestInitDefaultOptions(t *testing.T) {

e = LoadDataController{Format: LoadDataFormatParquet}
e.initDefaultOptions()
require.Greater(t, e.threadCnt, int64(0))
require.Equal(t, int64(math.Max(1, float64(runtime.NumCPU())*0.75)), e.threadCnt)
require.Greater(t, e.ThreadCnt, int64(0))
require.Equal(t, int64(math.Max(1, float64(runtime.NumCPU())*0.75)), e.ThreadCnt)
}

func TestInitOptions(t *testing.T) {
Expand Down Expand Up @@ -165,7 +170,7 @@ func TestInitOptions(t *testing.T) {
require.Equal(t, config.OpLevelOptional, e.checksum, sql)
require.False(t, e.addIndex, sql)
require.Equal(t, config.OpLevelRequired, e.analyze, sql)
require.Equal(t, int64(runtime.NumCPU()), e.threadCnt, sql)
require.Equal(t, int64(runtime.NumCPU()), e.ThreadCnt, sql)
require.Equal(t, int64(2000), e.BatchSize, sql)
require.Equal(t, config.ByteSize(200<<20), e.maxWriteSpeed, sql)
require.True(t, e.splitFile, sql)
Expand All @@ -176,12 +181,12 @@ func TestInitOptions(t *testing.T) {
func TestAdjustOptions(t *testing.T) {
e := LoadDataController{
diskQuota: 1,
threadCnt: 100000000,
ThreadCnt: 100000000,
maxWriteSpeed: 10,
}
e.adjustOptions()
require.Equal(t, minDiskQuota, e.diskQuota)
require.Equal(t, int64(runtime.NumCPU()), e.threadCnt)
require.Equal(t, int64(runtime.NumCPU()), e.ThreadCnt)
require.Equal(t, minWriteSpeed, e.maxWriteSpeed)
}

Expand Down
Loading

0 comments on commit 31a3b2c

Please sign in to comment.