Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: Add option DupKeyCheckLazy to check duplicated key lazily #55246

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor {
insert := &InsertExec{
InsertValues: ivs,
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
IgnoreErr: v.IgnoreErr,
}
return insert
}
Expand Down Expand Up @@ -2650,6 +2651,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) exec.Executor {
tblID2table: tblID2table,
tblColPosInfos: v.TblColPosInfos,
assignFlag: assignFlag,
IgnoreError: v.IgnoreError,
}
updateExec.fkChecks, b.err = buildTblID2FKCheckExecs(b.ctx, tblID2table, v.FKChecks)
if b.err != nil {
Expand Down
58 changes: 45 additions & 13 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -47,6 +47,7 @@ import (
type InsertExec struct {
*InsertValues
OnDuplicate []*expression.Assignment
IgnoreErr bool
evalBuffer4Dup chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum
Expand All @@ -66,7 +67,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
sessVars := e.Ctx().GetSessionVars()
defer sessVars.CleanBuffers()
ignoreErr := sessVars.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError

txn, err := e.Ctx().Txn(true)
if err != nil {
Expand All @@ -92,13 +92,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if err != nil {
return err
}
} else if ignoreErr {
} else if e.IgnoreErr {
err := e.batchCheckAndInsert(ctx, rows, e.addRecord, false)
if err != nil {
return err
}
} else {
start := time.Now()
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessVars, txn)
for i, row := range rows {
var err error
sizeHintStep := int(sessVars.ShardAllocateStep)
Expand All @@ -108,9 +109,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if sizeHint > remain {
sizeHint = remain
}
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck)
} else {
err = e.addRecord(ctx, row, table.DupKeyCheckDefault)
err = e.addRecord(ctx, row, dupKeyCheck)
}
if err != nil {
return err
Expand Down Expand Up @@ -193,7 +194,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand All @@ -204,7 +205,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch]
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch)
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck)
if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err)
Expand Down Expand Up @@ -236,14 +237,26 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
e.stats.Prefetch += time.Since(prefetchStart)
}

// Use `optimizeDupKeyCheckForUpdate` to determine the update operation when the row meets the conflict in
// `INSERT ... ON DUPLICATE KEY UPDATE` statement.
// Though it is in an insert statement, `ON DUP KEY UPDATE` follows the dup-key check behavior of update.
// For example, it will ignore variable `tidb_constraint_check_in_place`, see the test case:
// https://github.com/pingcap/tidb/blob/3117d3fae50bbb5dabcde7b9589f92bfbbda5dc6/pkg/executor/test/writetest/write_test.go#L419-L426
updateDupKeyCheck := optimizeDupKeyCheckForUpdate(txn, e.IgnoreErr)
// Do not use `updateDupKeyCheck` for `AddRecord` because it is not optimized for insert.
// It seems that we can just use `DupKeyCheckSkip` here because all constraints are checked.
// But we still use `optimizeDupKeyCheckForNormalInsert` to make the refactor same behavior with the original code.
// TODO: just use `DupKeyCheckSkip` here.
addRecordDupKeyCheck := optimizeDupKeyCheckForNormalInsert(e.Ctx().GetSessionVars(), txn)

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck)
if err == nil {
continue
}
Expand All @@ -260,7 +273,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if handle == nil {
continue
}
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand All @@ -282,7 +295,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
err := e.addRecord(ctx, newRows[i], table.DupKeyCheckDefault)
err := e.addRecord(ctx, newRows[i], addRecordDupKeyCheck)
if err != nil {
return err
}
Expand All @@ -294,6 +307,25 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return nil
}

// optimizeDupKeyCheckForNormalInsert trys to optimize the DupKeyCheckMode for an insert statement according to the
// transaction and system variables.
// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the
// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned.
// This method only works for "normal" insert statements, that means the options like "IGNORE" and "ON DUPLICATE KEY"
// in a statement are not considerate, and callers should handle the above cases by themselves.
func optimizeDupKeyCheckForNormalInsert(vars *variable.SessionVars, txn kv.Transaction) table.DupKeyCheckMode {
if !vars.ConstraintCheckInPlace || txn.IsPessimistic() || txn.IsPipelined() {
// We can just check duplicated key lazily without keys in storage for the below cases:
// - `txn.Pipelined()` is true.
// It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode.
// DupKeyCheckLazy should be used to improve the performance.
// - The current transaction is pessimistic. The duplicate key check can be postponed to the lock stage.
// - The current transaction is optimistic but `tidb_constraint_check_in_place` is set to false.
return table.DupKeyCheckLazy
}
return table.DupKeyCheckInPlace
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
Expand Down Expand Up @@ -379,7 +411,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int) error {
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand Down Expand Up @@ -426,7 +458,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades)
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode)
if err != nil {
return err
}
Expand All @@ -440,7 +472,7 @@ func (e *InsertExec) setMessage() {
if e.SelectExec != nil || numRecords > 1 {
numWarnings := stmtCtx.WarningCount()
var numDuplicates uint64
if stmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError {
if e.IgnoreErr {
// if ignoreErr
numDuplicates = numRecords - stmtCtx.CopiedRows()
} else {
Expand Down
4 changes: 0 additions & 4 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,15 +1412,11 @@ func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int, dupKeyCheck table.DupKeyCheckMode,
) (err error) {
vars := e.Ctx().GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
}
if reserveAutoIDCount > 0 {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck)
} else {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck)
}
vars.PresumeKeyNotExists = false
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
case ast.OnDuplicateKeyHandlingIgnore:
return w.batchCheckAndInsert(ctx, rows[0:cnt], w.addRecordLD, false)
case ast.OnDuplicateKeyHandlingError:
txn, err := w.Ctx().Txn(true)
if err != nil {
return err
}
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(w.Ctx().GetSessionVars(), txn)
for i, row := range rows[0:cnt] {
sizeHintStep := int(w.Ctx().GetSessionVars().ShardAllocateStep)
if sizeHintStep > 0 && i%sizeHintStep == 0 {
Expand All @@ -655,9 +660,9 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
if sizeHint > remain {
sizeHint = remain
}
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck)
} else {
err = w.addRecord(ctx, row, table.DupKeyCheckDefault)
err = w.addRecord(ctx, row, dupKeyCheck)
}
if err != nil {
return err
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow, dupKeyCheck table.DupKeyCheckMode) error {
txn, err := e.Ctx().Txn(true)
if err != nil {
return err
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
err = e.addRecord(ctx, r.row, table.DupKeyCheckDefault)
err = e.addRecord(ctx, r.row, dupKeyCheck)
if err != nil {
return err
}
Expand Down Expand Up @@ -180,9 +180,12 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
if e.stats != nil {
e.stats.Prefetch = time.Since(prefetchStart)
}
e.Ctx().GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
sessionVars := e.Ctx().GetSessionVars()
sessionVars.StmtCtx.AddRecordRows(uint64(len(newRows)))
// TODO: seems we can optimize it to `DupKeyCheckSkip` because all conflict rows are deleted in previous steps.
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessionVars, txn)
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
err = e.replaceRow(ctx, r, dupKeyCheck)
if err != nil {
return err
}
Expand Down
60 changes: 52 additions & 8 deletions pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type UpdateExec struct {
fkChecks map[int64][]*FKCheckExec
// fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec
fkCascades map[int64][]*FKCascadeExec

IgnoreError bool
}

// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
Expand Down Expand Up @@ -172,7 +174,7 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
return nil
}

func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum) error {
func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum, dupKeyCheck table.DupKeyCheckMode) error {
defer trace.StartRegion(ctx, "UpdateExec").End()
bAssignFlag := make([]bool, len(e.assignFlag))
for i, flag := range e.assignFlag {
Expand Down Expand Up @@ -205,7 +207,7 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat
// Update row
fkChecks := e.fkChecks[content.TblID]
fkCascades := e.fkCascades[content.TblID]
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades)
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades, dupKeyCheck)
if err1 == nil {
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
Expand Down Expand Up @@ -273,6 +275,13 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
memUsageOfChk := int64(0)
totalNumRows := 0

txn, err := e.Ctx().Txn(true)
if err != nil {
return 0, err
}

dupKeyCheck := optimizeDupKeyCheckForUpdate(txn, e.IgnoreError)
for {
e.memTracker.Consume(-memUsageOfChk)
err := exec.Next(ctx, e.Children(0), chk)
Expand All @@ -286,14 +295,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
if e.collectRuntimeStatsEnabled() {
txn, err := e.Ctx().Txn(true)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
if snap := txn.GetSnapshot(); snap != nil {
snap.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
txn, err := e.Ctx().Txn(true)
// pipelined dml may already flush in background, don't touch it to avoid race.
if err == nil && !txn.IsPipelined() {
if !txn.IsPipelined() {
sc := e.Ctx().GetSessionVars().StmtCtx
txn.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
if sc.KvExecCounter != nil {
Expand Down Expand Up @@ -329,7 +336,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
}
// write to table
if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow); err != nil {
if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow, dupKeyCheck); err != nil {
return 0, err
}
}
Expand Down Expand Up @@ -577,3 +584,40 @@ func (e *UpdateExec) GetFKCascades() []*FKCascadeExec {
func (e *UpdateExec) HasFKCascades() bool {
return len(e.fkCascades) > 0
}

// optimizeDupKeyCheckForUpdate trys to optimize the DupKeyCheckMode for an update statement.
// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the
// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned.
// The second argument `ignoreNeedsCheckInPlace` is true if `IGNORE` keyword is used in the update statement.
func optimizeDupKeyCheckForUpdate(txn kv.Transaction, ignoreNeedsCheckInPlace bool) table.DupKeyCheckMode {
if txn.IsPipelined() {
// It means `@@tidb_dml_type='bulk'` which indicates to insert rows in "bulk" mode.
// At this time, `DupKeyCheckLazy` should be used to improve the performance.
// If "bulk" mode and IGNORE keyword are used together, "bulk" is prior, see:
// https://github.com/pingcap/tidb/issues/55187#issuecomment-2268356459
return table.DupKeyCheckLazy
}

if ignoreNeedsCheckInPlace {
// For `UPDATE IGNORE ...` and `INSERT IGNORE ... ON DUPLICATE KEY UPDATE ...` statements,
// `DupKeyCheckInPlace` should be used to make sure the executor can get the error
// immediately and ignore it then.
return table.DupKeyCheckInPlace
}

if txn.IsPessimistic() {
// We can just check duplicated key lazily without keys in storage for the below cases:
// - `txn.Pipelined()` is true.
// It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode.
// DupKeyCheckLazy should be used to improve the performance.
// - The current transaction is pessimistic.
// The duplicate key check can be postponed to the lock stage.
// Please notice that for optimistic transaction, it always returns `DupKeyCheckInPlace` even if
// `tidb_constraint_check_in_place` is `OFF`.
// That is because `tidb_constraint_check_in_place` is only designed for insert cases, see comments in issue:
// https://github.com/pingcap/tidb/issues/54492#issuecomment-2229941881
return table.DupKeyCheckLazy
}

return table.DupKeyCheckInPlace
}
Loading