Skip to content

Commit

Permalink
*: make load data atomic by default (#18807)
Browse files Browse the repository at this point in the history
Signed-off-by: Tina Fritz <tina77fritz@gmail.com>
  • Loading branch information
tina77fritz authored Aug 6, 2020
1 parent d5dec67 commit 32963d3
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 13 deletions.
5 changes: 3 additions & 2 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
}

// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() && config.GetGlobalConfig().EnableBatchDML
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() &&
config.GetGlobalConfig().EnableBatchDML && batchDMLSize > 0
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
memUsageOfChk := int64(0)
Expand Down
4 changes: 2 additions & 2 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func insertRows(ctx context.Context, base insertCommon) (err error) {
return err
}
sessVars := e.ctx.GetSessionVars()
batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML
batchSize := sessVars.DMLBatchSize
batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML && batchSize > 0

e.lazyFillAutoID = true
evalRowFunc := e.fastEvalRow
Expand Down Expand Up @@ -403,8 +403,8 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
// If StrictSQLMode is disabled and it is a insert-select statement, it also handle BadNullAsWarning.
sessVars.StmtCtx.BadNullAsWarning = true
}
batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML
batchSize := sessVars.DMLBatchSize
batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML && batchSize > 0
memUsageOfRows := int64(0)
memTracker := e.memTracker
for {
Expand Down
7 changes: 7 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ func (s *testSuite5) TestSetVar(c *C) {
c.Assert(err, NotNil)
_, err = tk.Exec(`select @@session.tidb_slow_log_masking;`)
c.Assert(err, NotNil)

tk.MustQuery("select @@tidb_dml_batch_size;").Check(testkit.Rows("0"))
tk.MustExec("set @@session.tidb_dml_batch_size = 120")
tk.MustQuery("select @@tidb_dml_batch_size;").Check(testkit.Rows("120"))
c.Assert(tk.ExecToErr("set @@session.tidb_dml_batch_size = -120"), NotNil)
c.Assert(tk.ExecToErr("set @@global.tidb_dml_batch_size = 120"), NotNil)
tk.MustQuery("select @@tidb_dml_batch_size;").Check(testkit.Rows("120"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
11 changes: 5 additions & 6 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ type SessionVars struct {
Concurrency
MemQuota
BatchSize
// DMLBatchSize indicates the number of rows batch-committed for a statement.
// It will be used when using LOAD DATA or BatchInsert or BatchDelete is on.
DMLBatchSize int
RetryLimit int64
DisableTxnAutoRetry bool
// UsersLock is a lock for user defined variables.
Expand Down Expand Up @@ -832,8 +835,8 @@ func NewSessionVars() *SessionVars {
IndexLookupSize: DefIndexLookupSize,
InitChunkSize: DefInitChunkSize,
MaxChunkSize: DefMaxChunkSize,
DMLBatchSize: DefDMLBatchSize,
}
vars.DMLBatchSize = DefDMLBatchSize
var enableStreaming string
if config.GetGlobalConfig().EnableStreaming {
enableStreaming = "1"
Expand Down Expand Up @@ -1250,7 +1253,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBBatchCommit:
s.BatchCommit = TiDBOptOn(val)
case TiDBDMLBatchSize:
s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize)
s.DMLBatchSize = int(tidbOptInt64(val, DefOptCorrelationExpFactor))
case TiDBCurrentTS, TiDBConfig:
return ErrReadOnly
case TiDBMaxChunkSize:
Expand Down Expand Up @@ -1696,10 +1699,6 @@ type MemQuota struct {

// BatchSize defines batch size values.
type BatchSize struct {
// DMLBatchSize indicates the size of batches for DML.
// It will be used when BatchInsert or BatchDelete is on.
DMLBatchSize int

// IndexJoinBatchSize is the batch size of a index lookup join.
IndexJoinBatchSize int

Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ const (
DefCurretTS = 0
DefInitChunkSize = 32
DefMaxChunkSize = 1024
DefDMLBatchSize = 20000
DefDMLBatchSize = 0
DefMaxPreparedStmtCount = -1
DefWaitTimeout = 0
DefTiDBMemQuotaHashJoin = 32 << 30 // 32GB.
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc
TiDBIndexLookupSize,
TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast, TiDBBackOffWeight,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
Expand All @@ -557,7 +557,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
case TiDBOptCorrelationExpFactor:
case TiDBOptCorrelationExpFactor, TiDBDMLBatchSize:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
Expand Down

0 comments on commit 32963d3

Please sign in to comment.