-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: load data statement, separate data preparing routine and commit routine #11533
Conversation
/run-all-tests |
Could you do a test for many columns(such as 100 columns) ? |
en. will construct such testing scenarios, and profiling defailed cpu/memory usage |
/run-all-tests |
executor/load_data.go
Outdated
// EnqOneTask feed one batch commit task to commit routine | ||
func (e *LoadDataInfo) EnqOneTask() { | ||
if e.curBatchCnt > 0 { | ||
logutil.BgLogger().Info("tried to enqueue one task current len", zap.Uint64("len", e.curBatchCnt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make the committed batch size configurable. It is better to set it to debug level.
executor/load_data.go
Outdated
// InitQueues initialize task queue and error report queue | ||
func (e *LoadDataInfo) InitQueues() { | ||
e.commitTaskQueue = make(chan commitTask, 1000) | ||
e.commitErrQueue = make(chan error, 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make(chan error)
server/conn.go
Outdated
} | ||
}() | ||
// start commit worker routine | ||
go loadDataInfo.CommitRoutine(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make the load process into a separate goroutine and keep the commit process in this goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please fix ci.
Codecov Report
@@ Coverage Diff @@
## master #11533 +/- ##
===========================================
Coverage 81.2905% 81.2905%
===========================================
Files 454 454
Lines 98720 98720
===========================================
Hits 80250 80250
Misses 12725 12725
Partials 5745 5745 |
/run-all-tests |
sessionctx/variable/tidb_vars.go
Outdated
@@ -286,6 +286,9 @@ const ( | |||
|
|||
// TiDBEnableNoopFuncs set true will enable using fake funcs(like get_lock release_lock) | |||
TiDBEnableNoopFuncs = "tidb_enable_noop_functions" | |||
|
|||
// TiDBLoadDataSeqProcess set true will make load data process and commit sequentially in one routine | |||
TiDBLoadDataSeqProcess = "tidb_load_data_seq_process" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some tests for it.
executor/load_data.go
Outdated
defer func() { | ||
r := recover() | ||
if r != nil { | ||
buf := make([]byte, 4096) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Zap log library has a Stack function to get stack tracing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
roger, changed
executor/load_data.go
Outdated
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err)) | ||
return err | ||
} | ||
// this only set in sequential mode, e.rows buffer will be reused in sequential mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is sequential mode?
executor/load_data.go
Outdated
return err | ||
} | ||
} | ||
logutil.Logger(ctx).Debug("one task enqueued ", zap.Int("current queue len", len(e.commitTaskQueue))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debug message.
executor/load_data.go
Outdated
break | ||
} | ||
tasks++ | ||
logutil.Logger(ctx).Debug("commit one task finished", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
executor/load_data.go
Outdated
zap.Int("pending tasks count", len(e.commitTaskQueue))) | ||
} else { | ||
end = true | ||
logutil.Logger(ctx).Info("commit work all finished", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
server/conn.go
Outdated
} | ||
if err = loadDataInfo.Ctx.StmtCommit(); err != nil { | ||
return nil, err | ||
if enqTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to use enqTask argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
param removed
server/conn.go
Outdated
defer func() { | ||
r := recover() | ||
if r != nil { | ||
buf := make([]byte, 4096) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use zap library instead.
Please do not force push your commit. Just commit is fine. |
executor/load_data.go
Outdated
|
||
// these fields are used for pipeline data prepare and commit | ||
commitTaskQueue chan CommitTask | ||
QuitCommit chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do it by only using one signal channel.
/run-unit-test |
/run-all-tests |
@cfzjywxk merge failed. |
/rebuild |
What problem does this PR solve?
experiments diff on
#11517
What is changed and how it works?
load data statement
bench workload1
sysbench schema
1m rows = 185MB
10m rows = 1.9GB
batch size = 20k
batch size = 200k
batch size = 4k
data size = 10m
bench workload2
this workload task generator routine and commit routine cooperate well, producing and committing seem concurrently progressing. Master branch do all these things sequentially
column content = 99999999
100columns 1m rows = 852MB
500columns 1m rows = 4.2GB
500columns
bench workload3
wide table with 100 columns (all columns with BLOB or TEXT fields) , big column
work queue size 1000(max wait job numbers)
( txn limit 100MB)
schema like
100k rows = 1.2GB
some conclusions up to now
workload3 batch size = 5k and batch size = 1k have much different performance, batch size = 5kmany error logs in like
from monitor data on grafana, we see prewrite on kv is quit slow, the root cause should be the prewrite request is quite slow(32sec on 5k batch size)tikv/tikv#5279
has some explanations on prewrite slow on tikv executing workload3(big rowsize)
this could be solved by increase tikv raftstore.raft-max-inflight-msgs configuration(default -> 4096), allow more un ack raft messages in queue, after changing this config,
Check List
Tests
Code changes
Side effects
Related changes