Skip to content

Commit

Permalink
table: Add option DupKeyCheckLazy to check duplicated key lazily
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Aug 7, 2024
1 parent 3dfa15c commit efc9de6
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 78 deletions.
2 changes: 2 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor {
insert := &InsertExec{
InsertValues: ivs,
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
IgnoreErr: v.IgnoreErr,
}
return insert
}
Expand Down Expand Up @@ -2650,6 +2651,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) exec.Executor {
tblID2table: tblID2table,
tblColPosInfos: v.TblColPosInfos,
assignFlag: assignFlag,
IgnoreError: v.IgnoreError,
}
updateExec.fkChecks, b.err = buildTblID2FKCheckExecs(b.ctx, tblID2table, v.FKChecks)
if b.err != nil {
Expand Down
62 changes: 49 additions & 13 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -47,6 +47,7 @@ import (
type InsertExec struct {
*InsertValues
OnDuplicate []*expression.Assignment
IgnoreErr bool
evalBuffer4Dup chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum
Expand All @@ -66,7 +67,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
sessVars := e.Ctx().GetSessionVars()
defer sessVars.CleanBuffers()
ignoreErr := sessVars.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError

txn, err := e.Ctx().Txn(true)
if err != nil {
Expand All @@ -92,13 +92,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if err != nil {
return err
}
} else if ignoreErr {
} else if e.IgnoreErr {
err := e.batchCheckAndInsert(ctx, rows, e.addRecord, false)
if err != nil {
return err
}
} else {
start := time.Now()
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessVars, txn)
for i, row := range rows {
var err error
sizeHintStep := int(sessVars.ShardAllocateStep)
Expand All @@ -108,9 +109,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if sizeHint > remain {
sizeHint = remain
}
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck)
} else {
err = e.addRecord(ctx, row, table.DupKeyCheckDefault)
err = e.addRecord(ctx, row, dupKeyCheck)
}
if err != nil {
return err
Expand Down Expand Up @@ -193,7 +194,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand All @@ -204,7 +205,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch]
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch)
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck)
if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err)
Expand Down Expand Up @@ -236,14 +237,29 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
e.stats.Prefetch += time.Since(prefetchStart)
}

// If the current row has some conflicts, the operation will be changed to update.
// If this happens, there still may be another index that has a conflict,
// so we need to determine DupKeyCheckMode here.
var dupKeyCheck table.DupKeyCheckMode
if e.IgnoreErr && !txn.IsPipelined() {
// If the current statement is `INSERT IGNORE ... ON DUPLICATE KEY ...`, `DupKeyCheckInPlace` should be used.
// It is because the executor should get the dup-key error immediately and ignore it.
// However, if `txn.IsPipeline()` returns true, we need force to use `DupKeyCheckLazy` to save performance,
// even if it may cause the dup-key error cannot be handled in time and returned to the user.
// See comment in issue: https://github.com/pingcap/tidb/issues/55187#issuecomment-2268356459
dupKeyCheck = table.DupKeyCheckInPlace
} else {
dupKeyCheck = optimizeDupKeyCheckForNormalInsert(e.Ctx().GetSessionVars(), txn)
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, dupKeyCheck)
if err == nil {
continue
}
Expand All @@ -260,7 +276,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if handle == nil {
continue
}
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, dupKeyCheck)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand All @@ -282,7 +298,8 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
err := e.addRecord(ctx, newRows[i], table.DupKeyCheckDefault)
// TODO: seems we can optimize it to `DupKeyCheckSkip` all constraints are checked in previous steps.
err := e.addRecord(ctx, newRows[i], dupKeyCheck)
if err != nil {
return err
}
Expand All @@ -294,6 +311,25 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return nil
}

// optimizeDupKeyCheckForNormalInsert trys to optimize the DupKeyCheckMode for an insert statement according to the
// transaction and system variables.
// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the
// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned.
// This method only works for "normal" insert statements, that means the options like "IGNORE" and "ON DUPLICATE KEY"
// in a statement are not considerate, and callers should handle the above cases by themselves.
func optimizeDupKeyCheckForNormalInsert(vars *variable.SessionVars, txn kv.Transaction) table.DupKeyCheckMode {
if !vars.ConstraintCheckInPlace || txn.IsPessimistic() || txn.IsPipelined() {
// We can just check duplicated key lazily without keys in storage for the below cases:
// - `txn.Pipelined()` is true.
// It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode.
// DupKeyCheckLazy should be used to improve the performance.
// - The current transaction is pessimistic. The duplicate key check can be postponed to the lock stage.
// - The current transaction is optimistic but `tidb_constraint_check_in_place` is set to false.
return table.DupKeyCheckLazy
}
return table.DupKeyCheckInPlace
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
Expand Down Expand Up @@ -379,7 +415,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int) error {
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand Down Expand Up @@ -426,7 +462,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades)
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode)
if err != nil {
return err
}
Expand All @@ -440,7 +476,7 @@ func (e *InsertExec) setMessage() {
if e.SelectExec != nil || numRecords > 1 {
numWarnings := stmtCtx.WarningCount()
var numDuplicates uint64
if stmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError {
if e.IgnoreErr {
// if ignoreErr
numDuplicates = numRecords - stmtCtx.CopiedRows()
} else {
Expand Down
4 changes: 0 additions & 4 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,15 +1412,11 @@ func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int, dupKeyCheck table.DupKeyCheckMode,
) (err error) {
vars := e.Ctx().GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
}
if reserveAutoIDCount > 0 {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck)
} else {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck)
}
vars.PresumeKeyNotExists = false
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
case ast.OnDuplicateKeyHandlingIgnore:
return w.batchCheckAndInsert(ctx, rows[0:cnt], w.addRecordLD, false)
case ast.OnDuplicateKeyHandlingError:
txn, err := w.Ctx().Txn(true)
if err != nil {
return err
}
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(w.Ctx().GetSessionVars(), txn)
for i, row := range rows[0:cnt] {
sizeHintStep := int(w.Ctx().GetSessionVars().ShardAllocateStep)
if sizeHintStep > 0 && i%sizeHintStep == 0 {
Expand All @@ -655,9 +660,9 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
if sizeHint > remain {
sizeHint = remain
}
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck)
} else {
err = w.addRecord(ctx, row, table.DupKeyCheckDefault)
err = w.addRecord(ctx, row, dupKeyCheck)
}
if err != nil {
return err
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow, dupKeyCheck table.DupKeyCheckMode) error {
txn, err := e.Ctx().Txn(true)
if err != nil {
return err
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
err = e.addRecord(ctx, r.row, table.DupKeyCheckDefault)
err = e.addRecord(ctx, r.row, dupKeyCheck)
if err != nil {
return err
}
Expand Down Expand Up @@ -180,9 +180,12 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
if e.stats != nil {
e.stats.Prefetch = time.Since(prefetchStart)
}
e.Ctx().GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
sessionVars := e.Ctx().GetSessionVars()
sessionVars.StmtCtx.AddRecordRows(uint64(len(newRows)))
// TODO: seems we can optimize it to `DupKeyCheckSkip` because all conflict rows are deleted in previous steps.
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessionVars, txn)
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
err = e.replaceRow(ctx, r, dupKeyCheck)
if err != nil {
return err
}
Expand Down
37 changes: 29 additions & 8 deletions pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type UpdateExec struct {
fkChecks map[int64][]*FKCheckExec
// fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec
fkCascades map[int64][]*FKCascadeExec

IgnoreError bool
}

// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
Expand Down Expand Up @@ -172,7 +174,7 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro
return nil
}

func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum) error {
func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum, dupKeyCheck table.DupKeyCheckMode) error {
defer trace.StartRegion(ctx, "UpdateExec").End()
bAssignFlag := make([]bool, len(e.assignFlag))
for i, flag := range e.assignFlag {
Expand Down Expand Up @@ -205,7 +207,7 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat
// Update row
fkChecks := e.fkChecks[content.TblID]
fkCascades := e.fkCascades[content.TblID]
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades)
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades, dupKeyCheck)
if err1 == nil {
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
Expand Down Expand Up @@ -273,6 +275,27 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
memUsageOfChk := int64(0)
totalNumRows := 0

txn, err := e.Ctx().Txn(true)
if err != nil {
return 0, err
}

dupKeyCheck := table.DupKeyCheckInPlace
if (txn.IsPessimistic() && !e.IgnoreError) || txn.IsPipelined() {
// - If `txn.Pipelined()`, it means current is using `@@tidb_dml_type="bulk"` to insert rows.
// `DupKeyCheckLazy` should be used in "bulk" mode to avoid request storage and improve the performance.
// - If `txn.IsPessimistic()`, we can use `DupKeyCheckLazy` to postpone the storage constraints check
// to subsequence stages such as lock.
// One exception is `UPDATE IGNORE ...`, `DupKeyCheckInPlace` should be used to ensure executor can get the
// dup-key error immediately and ignore it then.
// - If the current txn is optimistic, `DupKeyCheckInPlace` is always used
// even if `tidb_constraint_check_in_place` is `OFF`.
// This is because `tidb_constraint_check_in_place` is only designed for insert cases, see comments in issue:
// https://github.com/pingcap/tidb/issues/54492#issuecomment-2229941881
dupKeyCheck = table.DupKeyCheckLazy
}

for {
e.memTracker.Consume(-memUsageOfChk)
err := exec.Next(ctx, e.Children(0), chk)
Expand All @@ -286,14 +309,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
if e.collectRuntimeStatsEnabled() {
txn, err := e.Ctx().Txn(true)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
if snap := txn.GetSnapshot(); snap != nil {
snap.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
txn, err := e.Ctx().Txn(true)
// pipelined dml may already flush in background, don't touch it to avoid race.
if err == nil && !txn.IsPipelined() {
if !txn.IsPipelined() {
sc := e.Ctx().GetSessionVars().StmtCtx
txn.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
if sc.KvExecCounter != nil {
Expand Down Expand Up @@ -329,7 +350,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
}
// write to table
if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow); err != nil {
if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow, dupKeyCheck); err != nil {
return 0, err
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var (
func updateRecord(
ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool,
t table.Table,
onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec,
onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, dupKeyMode table.DupKeyCheckMode,
) (bool, error) {
r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord")
defer r.End()
Expand Down Expand Up @@ -179,7 +179,7 @@ func updateRecord(
return false, err
}

_, err = t.AddRecord(sctx.GetTableCtx(), newData, table.IsUpdate, table.WithCtx(ctx))
_, err = t.AddRecord(sctx.GetTableCtx(), newData, table.IsUpdate, table.WithCtx(ctx), dupKeyMode)
if err != nil {
return false, err
}
Expand All @@ -199,9 +199,9 @@ func updateRecord(
// If InHandleForeignKeyTrigger or ForeignKeyTriggerCtx.HasFKCascades is true indicate we may have
// foreign key cascade need to handle later, then we still need to write index value,
// otherwise, the later foreign cascade executor may see data-index inconsistency in txn-mem-buffer.
opts = []table.UpdateRecordOption{table.WithCtx(ctx)}
opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode}
} else {
opts = []table.UpdateRecordOption{table.WithCtx(ctx), table.SkipWriteUntouchedIndices}
opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode, table.SkipWriteUntouchedIndices}
}

// Update record to new value and update index.
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ type Insert struct {
SelectPlan base.PhysicalPlan

IsReplace bool
IgnoreErr bool

// NeedFillDefaultValue is true when expr in value list reference other column.
NeedFillDefaultValue bool
Expand Down Expand Up @@ -432,6 +433,8 @@ type Update struct {

AllAssignmentsAreConstant bool

IgnoreError bool

VirtualAssignmentsOffset int

SelectPlan base.PhysicalPlan
Expand Down
Loading

0 comments on commit efc9de6

Please sign in to comment.