Skip to content

Commit

Permalink
table: Add option DupKeyCheckLazy to check duplicated key lazily (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Aug 8, 2024
1 parent 3357f26 commit d4c26ca
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 78 deletions.
2 changes: 2 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor {
insert := &InsertExec{
InsertValues: ivs,
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
IgnoreErr: v.IgnoreErr,
}
return insert
}
Expand Down Expand Up @@ -2650,6 +2651,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) exec.Executor {
tblID2table: tblID2table,
tblColPosInfos: v.TblColPosInfos,
assignFlag: assignFlag,
IgnoreError: v.IgnoreError,
}
updateExec.fkChecks, b.err = buildTblID2FKCheckExecs(b.ctx, tblID2table, v.FKChecks)
if b.err != nil {
Expand Down
58 changes: 45 additions & 13 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -47,6 +47,7 @@ import (
type InsertExec struct {
*InsertValues
OnDuplicate []*expression.Assignment
IgnoreErr bool
evalBuffer4Dup chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum
Expand All @@ -66,7 +67,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
sessVars := e.Ctx().GetSessionVars()
defer sessVars.CleanBuffers()
ignoreErr := sessVars.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError

txn, err := e.Ctx().Txn(true)
if err != nil {
Expand All @@ -92,13 +92,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if err != nil {
return err
}
} else if ignoreErr {
} else if e.IgnoreErr {
err := e.batchCheckAndInsert(ctx, rows, e.addRecord, false)
if err != nil {
return err
}
} else {
start := time.Now()
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessVars, txn)
for i, row := range rows {
var err error
sizeHintStep := int(sessVars.ShardAllocateStep)
Expand All @@ -108,9 +109,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if sizeHint > remain {
sizeHint = remain
}
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck)
} else {
err = e.addRecord(ctx, row, table.DupKeyCheckDefault)
err = e.addRecord(ctx, row, dupKeyCheck)
}
if err != nil {
return err
Expand Down Expand Up @@ -193,7 +194,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand All @@ -204,7 +205,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch]
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch)
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck)
if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err)
Expand Down Expand Up @@ -236,14 +237,26 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
e.stats.Prefetch += time.Since(prefetchStart)
}

// Use `optimizeDupKeyCheckForUpdate` to determine the update operation when the row meets the conflict in
// `INSERT ... ON DUPLICATE KEY UPDATE` statement.
// Though it is in an insert statement, `ON DUP KEY UPDATE` follows the dup-key check behavior of update.
// For example, it will ignore variable `tidb_constraint_check_in_place`, see the test case:
// https://github.com/pingcap/tidb/blob/3117d3fae50bbb5dabcde7b9589f92bfbbda5dc6/pkg/executor/test/writetest/write_test.go#L419-L426
updateDupKeyCheck := optimizeDupKeyCheckForUpdate(txn, e.IgnoreErr)
// Do not use `updateDupKeyCheck` for `AddRecord` because it is not optimized for insert.
// It seems that we can just use `DupKeyCheckSkip` here because all constraints are checked.
// But we still use `optimizeDupKeyCheckForNormalInsert` to make the refactor same behavior with the original code.
// TODO: just use `DupKeyCheckSkip` here.
addRecordDupKeyCheck := optimizeDupKeyCheckForNormalInsert(e.Ctx().GetSessionVars(), txn)

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck)
if err == nil {
continue
}
Expand All @@ -260,7 +273,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if handle == nil {
continue
}
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand All @@ -282,7 +295,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
err := e.addRecord(ctx, newRows[i], table.DupKeyCheckDefault)
err := e.addRecord(ctx, newRows[i], addRecordDupKeyCheck)
if err != nil {
return err
}
Expand All @@ -294,6 +307,25 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return nil
}

// optimizeDupKeyCheckForNormalInsert trys to optimize the DupKeyCheckMode for an insert statement according to the
// transaction and system variables.
// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the
// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned.
// This method only works for "normal" insert statements, that means the options like "IGNORE" and "ON DUPLICATE KEY"
// in a statement are not considerate, and callers should handle the above cases by themselves.
func optimizeDupKeyCheckForNormalInsert(vars *variable.SessionVars, txn kv.Transaction) table.DupKeyCheckMode {
if !vars.ConstraintCheckInPlace || txn.IsPessimistic() || txn.IsPipelined() {
// We can just check duplicated key lazily without keys in storage for the below cases:
// - `txn.Pipelined()` is true.
// It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode.
// DupKeyCheckLazy should be used to improve the performance.
// - The current transaction is pessimistic. The duplicate key check can be postponed to the lock stage.
// - The current transaction is optimistic but `tidb_constraint_check_in_place` is set to false.
return table.DupKeyCheckLazy
}
return table.DupKeyCheckInPlace
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
Expand Down Expand Up @@ -379,7 +411,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int) error {
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand Down Expand Up @@ -426,7 +458,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades)
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode)
if err != nil {
return err
}
Expand All @@ -440,7 +472,7 @@ func (e *InsertExec) setMessage() {
if e.SelectExec != nil || numRecords > 1 {
numWarnings := stmtCtx.WarningCount()
var numDuplicates uint64
if stmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError {
if e.IgnoreErr {
// if ignoreErr
numDuplicates = numRecords - stmtCtx.CopiedRows()
} else {
Expand Down
4 changes: 0 additions & 4 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,15 +1412,11 @@ func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int, dupKeyCheck table.DupKeyCheckMode,
) (err error) {
vars := e.Ctx().GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
}
if reserveAutoIDCount > 0 {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck)
} else {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck)
}
vars.PresumeKeyNotExists = false
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
case ast.OnDuplicateKeyHandlingIgnore:
return w.batchCheckAndInsert(ctx, rows[0:cnt], w.addRecordLD, false)
case ast.OnDuplicateKeyHandlingError:
txn, err := w.Ctx().Txn(true)
if err != nil {
return err
}
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(w.Ctx().GetSessionVars(), txn)
for i, row := range rows[0:cnt] {
sizeHintStep := int(w.Ctx().GetSessionVars().ShardAllocateStep)
if sizeHintStep > 0 && i%sizeHintStep == 0 {
Expand All @@ -655,9 +660,9 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
if sizeHint > remain {
sizeHint = remain
}
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck)
} else {
err = w.addRecord(ctx, row, table.DupKeyCheckDefault)
err = w.addRecord(ctx, row, dupKeyCheck)
}
if err != nil {
return err
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow, dupKeyCheck table.DupKeyCheckMode) error {
txn, err := e.Ctx().Txn(true)
if err != nil {
return err
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
err = e.addRecord(ctx, r.row, table.DupKeyCheckDefault)
err = e.addRecord(ctx, r.row, dupKeyCheck)
if err != nil {
return err
}
Expand Down Expand Up @@ -180,9 +180,12 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
if e.stats != nil {
e.stats.Prefetch = time.Since(prefetchStart)
}
e.Ctx().GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
sessionVars := e.Ctx().GetSessionVars()
sessionVars.StmtCtx.AddRecordRows(uint64(len(newRows)))
// TODO: seems we can optimize it to `DupKeyCheckSkip` because all conflict rows are deleted in previous steps.
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessionVars, txn)
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
err = e.replaceRow(ctx, r, dupKeyCheck)
if err != nil {
return err
}
Expand Down
60 changes: 52 additions & 8 deletions pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type UpdateExec struct {
fkChecks map[int64][]*FKCheckExec
// fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec
fkCascades map[int64][]*FKCascadeExec

IgnoreError bool
}

// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
Expand Down Expand Up @@ -172,7 +174,7 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
return nil
}

func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum) error {
func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum, dupKeyCheck table.DupKeyCheckMode) error {
defer trace.StartRegion(ctx, "UpdateExec").End()
bAssignFlag := make([]bool, len(e.assignFlag))
for i, flag := range e.assignFlag {
Expand Down Expand Up @@ -205,7 +207,7 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat
// Update row
fkChecks := e.fkChecks[content.TblID]
fkCascades := e.fkCascades[content.TblID]
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades)
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades, dupKeyCheck)
if err1 == nil {
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
Expand Down Expand Up @@ -273,6 +275,13 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
memUsageOfChk := int64(0)
totalNumRows := 0

txn, err := e.Ctx().Txn(true)
if err != nil {
return 0, err
}

dupKeyCheck := optimizeDupKeyCheckForUpdate(txn, e.IgnoreError)
for {
e.memTracker.Consume(-memUsageOfChk)
err := exec.Next(ctx, e.Children(0), chk)
Expand All @@ -286,14 +295,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
if e.collectRuntimeStatsEnabled() {
txn, err := e.Ctx().Txn(true)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
if snap := txn.GetSnapshot(); snap != nil {
snap.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
txn, err := e.Ctx().Txn(true)
// pipelined dml may already flush in background, don't touch it to avoid race.
if err == nil && !txn.IsPipelined() {
if !txn.IsPipelined() {
sc := e.Ctx().GetSessionVars().StmtCtx
txn.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
if sc.KvExecCounter != nil {
Expand Down Expand Up @@ -329,7 +336,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
}
// write to table
if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow); err != nil {
if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow, dupKeyCheck); err != nil {
return 0, err
}
}
Expand Down Expand Up @@ -577,3 +584,40 @@ func (e *UpdateExec) GetFKCascades() []*FKCascadeExec {
func (e *UpdateExec) HasFKCascades() bool {
return len(e.fkCascades) > 0
}

// optimizeDupKeyCheckForUpdate trys to optimize the DupKeyCheckMode for an update statement.
// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the
// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned.
// The second argument `ignoreNeedsCheckInPlace` is true if `IGNORE` keyword is used in the update statement.
func optimizeDupKeyCheckForUpdate(txn kv.Transaction, ignoreNeedsCheckInPlace bool) table.DupKeyCheckMode {
if txn.IsPipelined() {
// It means `@@tidb_dml_type='bulk'` which indicates to insert rows in "bulk" mode.
// At this time, `DupKeyCheckLazy` should be used to improve the performance.
// If "bulk" mode and IGNORE keyword are used together, "bulk" is prior, see:
// https://github.com/pingcap/tidb/issues/55187#issuecomment-2268356459
return table.DupKeyCheckLazy
}

if ignoreNeedsCheckInPlace {
// For `UPDATE IGNORE ...` and `INSERT IGNORE ... ON DUPLICATE KEY UPDATE ...` statements,
// `DupKeyCheckInPlace` should be used to make sure the executor can get the error
// immediately and ignore it then.
return table.DupKeyCheckInPlace
}

if txn.IsPessimistic() {
// We can just check duplicated key lazily without keys in storage for the below cases:
// - `txn.Pipelined()` is true.
// It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode.
// DupKeyCheckLazy should be used to improve the performance.
// - The current transaction is pessimistic.
// The duplicate key check can be postponed to the lock stage.
// Please notice that for optimistic transaction, it always returns `DupKeyCheckInPlace` even if
// `tidb_constraint_check_in_place` is `OFF`.
// That is because `tidb_constraint_check_in_place` is only designed for insert cases, see comments in issue:
// https://github.com/pingcap/tidb/issues/54492#issuecomment-2229941881
return table.DupKeyCheckLazy
}

return table.DupKeyCheckInPlace
}
Loading

0 comments on commit d4c26ca

Please sign in to comment.