From bfb479d4d9055e3163c5a1c4d8f317be8ad03701 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Wed, 7 Aug 2024 14:04:13 +0800 Subject: [PATCH] table: Add option `DupKeyCheckLazy` to check duplicated key lazily --- pkg/executor/builder.go | 2 + pkg/executor/insert.go | 74 +++++++++++++++++++----- pkg/executor/insert_common.go | 4 -- pkg/executor/load_data.go | 9 ++- pkg/executor/replace.go | 11 ++-- pkg/executor/update.go | 37 +++++++++--- pkg/executor/write.go | 8 +-- pkg/planner/core/common_plans.go | 3 + pkg/planner/core/logical_plan_builder.go | 1 + pkg/planner/core/planbuilder.go | 1 + pkg/planner/core/point_get_plan.go | 1 + pkg/sessionctx/variable/BUILD.bazel | 1 - pkg/sessionctx/variable/session.go | 9 --- pkg/table/table.go | 14 ++--- pkg/table/tables/index.go | 8 +-- pkg/table/tables/tables.go | 2 +- pkg/table/tables/tables_test.go | 30 ++++------ 17 files changed, 137 insertions(+), 78 deletions(-) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 6bcee9f5f8be3..3c9af512486c2 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -999,6 +999,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor { insert := &InsertExec{ InsertValues: ivs, OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...), + IgnoreErr: v.IgnoreErr, } return insert } @@ -2650,6 +2651,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) exec.Executor { tblID2table: tblID2table, tblColPosInfos: v.TblColPosInfos, assignFlag: assignFlag, + IgnoreError: v.IgnoreError, } updateExec.fkChecks, b.err = buildTblID2FKCheckExecs(b.ctx, tblID2table, v.FKChecks) if b.err != nil { diff --git a/pkg/executor/insert.go b/pkg/executor/insert.go index 06c0fef9704e6..64dfd15953205 100644 --- a/pkg/executor/insert.go +++ b/pkg/executor/insert.go @@ -22,7 +22,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" @@ -31,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -47,6 +47,7 @@ import ( type InsertExec struct { *InsertValues OnDuplicate []*expression.Assignment + IgnoreErr bool evalBuffer4Dup chunk.MutRow curInsertVals chunk.MutRow row4Update []types.Datum @@ -66,7 +67,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { // If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode. sessVars := e.Ctx().GetSessionVars() defer sessVars.CleanBuffers() - ignoreErr := sessVars.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError txn, err := e.Ctx().Txn(true) if err != nil { @@ -92,13 +92,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { if err != nil { return err } - } else if ignoreErr { + } else if e.IgnoreErr { err := e.batchCheckAndInsert(ctx, rows, e.addRecord, false) if err != nil { return err } } else { start := time.Now() + dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessVars, txn) for i, row := range rows { var err error sizeHintStep := int(sessVars.ShardAllocateStep) @@ -108,9 +109,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { if sizeHint > remain { sizeHint = remain } - err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault) + err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck) } else { - err = e.addRecord(ctx, row, table.DupKeyCheckDefault) + err = e.addRecord(ctx, row, dupKeyCheck) } if err != nil { return err @@ -193,7 +194,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction } // updateDupRow updates a duplicate row to a new row. -func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment) error { +func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode) error { oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs) if err != nil { return err @@ -204,7 +205,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch] } - err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch) + err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck) if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) { ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx() return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err) @@ -236,6 +237,34 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D e.stats.Prefetch += time.Since(prefetchStart) } + // If the current row has some conflicts, the operation will be changed to update. + // If this happens, there still may be another index that has a conflict, + // so we need to determine DupKeyCheckMode here. + updateDupKeyCheck := table.DupKeyCheckInPlace + if (txn.IsPessimistic() && !e.IgnoreErr) || txn.IsPipelined() { + // - If `txn.Pipelined()`, it means current is using `@@tidb_dml_type="bulk"` to insert rows. + // `DupKeyCheckLazy` should be used in "bulk" mode to avoid request storage and improve the performance. + // - If `txn.IsPessimistic()`, we can use `DupKeyCheckLazy` to postpone the storage constraints check + // to subsequence stages such as lock. + // However, if the current statement is `INSERT IGNORE ... ON DUPLICATE KEY ...`, + // `DupKeyCheckInPlace` should be used. + // It is because the executor should get the dup-key error immediately and ignore it. + // - If the current txn is optimistic, `DupKeyCheckInPlace` is always used + // even if `tidb_constraint_check_in_place` is `OFF`. + // This is because `tidb_constraint_check_in_place` is only designed for insert cases, see comments in issue: + // https://github.com/pingcap/tidb/issues/54492#issuecomment-2229941881 + // Though it is still in an insert statement, but it seems some old tests still think it should + // check constraints in place, see test: + // https://github.com/pingcap/tidb/blob/3117d3fae50bbb5dabcde7b9589f92bfbbda5dc6/pkg/executor/test/writetest/write_test.go#L419-L426 + updateDupKeyCheck = table.DupKeyCheckLazy + } + + // Do not use `updateDupKeyCheck` for `AddRecord` because it is not optimized for insert. + // It seems that we can just use `DupKeyCheckSkip` here because all constraints are checked. + // But we still use `optimizeDupKeyCheckForNormalInsert` to make the refactor same behavior with the original code. + // TODO: just use `DupKeyCheckSkip` here. + addRecordDupKeyCheck := optimizeDupKeyCheckForNormalInsert(e.Ctx().GetSessionVars(), txn) + for i, r := range toBeCheckedRows { if r.handleKey != nil { handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey) @@ -243,7 +272,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D return err } - err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) + err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck) if err == nil { continue } @@ -260,7 +289,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D if handle == nil { continue } - err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) + err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck) if err != nil { if kv.IsErrNotFound(err) { // Data index inconsistent? A unique key provide the handle information, but the @@ -282,7 +311,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D // and key-values should be filled back to dupOldRowValues for the further row check, // due to there may be duplicate keys inside the insert statement. if newRows[i] != nil { - err := e.addRecord(ctx, newRows[i], table.DupKeyCheckDefault) + err := e.addRecord(ctx, newRows[i], addRecordDupKeyCheck) if err != nil { return err } @@ -294,6 +323,25 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D return nil } +// optimizeDupKeyCheckForNormalInsert trys to optimize the DupKeyCheckMode for an insert statement according to the +// transaction and system variables. +// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the +// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned. +// This method only works for "normal" insert statements, that means the options like "IGNORE" and "ON DUPLICATE KEY" +// in a statement are not considerate, and callers should handle the above cases by themselves. +func optimizeDupKeyCheckForNormalInsert(vars *variable.SessionVars, txn kv.Transaction) table.DupKeyCheckMode { + if !vars.ConstraintCheckInPlace || txn.IsPessimistic() || txn.IsPipelined() { + // We can just check duplicated key lazily without keys in storage for the below cases: + // - `txn.Pipelined()` is true. + // It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode. + // DupKeyCheckLazy should be used to improve the performance. + // - The current transaction is pessimistic. The duplicate key check can be postponed to the lock stage. + // - The current transaction is optimistic but `tidb_constraint_check_in_place` is set to false. + return table.DupKeyCheckLazy + } + return table.DupKeyCheckInPlace +} + // Next implements the Executor Next interface. func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() @@ -379,7 +427,7 @@ func (e *InsertExec) initEvalBuffer4Dup() { // doDupRowUpdate updates the duplicate row. func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum, - extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int) error { + extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode) error { assignFlag := make([]bool, len(e.Table.WritableCols())) // See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values e.curInsertVals.SetDatums(newRow...) @@ -426,7 +474,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo } newData := e.row4Update[:len(oldRow)] - _, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades) + _, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode) if err != nil { return err } @@ -440,7 +488,7 @@ func (e *InsertExec) setMessage() { if e.SelectExec != nil || numRecords > 1 { numWarnings := stmtCtx.WarningCount() var numDuplicates uint64 - if stmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError { + if e.IgnoreErr { // if ignoreErr numDuplicates = numRecords - stmtCtx.CopiedRows() } else { diff --git a/pkg/executor/insert_common.go b/pkg/executor/insert_common.go index b12975dea46dd..eace13b89a296 100644 --- a/pkg/executor/insert_common.go +++ b/pkg/executor/insert_common.go @@ -1412,15 +1412,11 @@ func (e *InsertValues) addRecordWithAutoIDHint( ctx context.Context, row []types.Datum, reserveAutoIDCount int, dupKeyCheck table.DupKeyCheckMode, ) (err error) { vars := e.Ctx().GetSessionVars() - if !vars.ConstraintCheckInPlace { - vars.PresumeKeyNotExists = true - } if reserveAutoIDCount > 0 { _, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck) } else { _, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck) } - vars.PresumeKeyNotExists = false if err != nil { return err } diff --git a/pkg/executor/load_data.go b/pkg/executor/load_data.go index e6471546d36a5..fde54ed25360e 100644 --- a/pkg/executor/load_data.go +++ b/pkg/executor/load_data.go @@ -647,6 +647,11 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type case ast.OnDuplicateKeyHandlingIgnore: return w.batchCheckAndInsert(ctx, rows[0:cnt], w.addRecordLD, false) case ast.OnDuplicateKeyHandlingError: + txn, err := w.Ctx().Txn(true) + if err != nil { + return err + } + dupKeyCheck := optimizeDupKeyCheckForNormalInsert(w.Ctx().GetSessionVars(), txn) for i, row := range rows[0:cnt] { sizeHintStep := int(w.Ctx().GetSessionVars().ShardAllocateStep) if sizeHintStep > 0 && i%sizeHintStep == 0 { @@ -655,9 +660,9 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type if sizeHint > remain { sizeHint = remain } - err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault) + err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck) } else { - err = w.addRecord(ctx, row, table.DupKeyCheckDefault) + err = w.addRecord(ctx, row, dupKeyCheck) } if err != nil { return err diff --git a/pkg/executor/replace.go b/pkg/executor/replace.go index f8934e008d43b..3b9e9b4101d34 100644 --- a/pkg/executor/replace.go +++ b/pkg/executor/replace.go @@ -63,7 +63,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error { } // replaceRow removes all duplicate rows for one row, then inserts it. -func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { +func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow, dupKeyCheck table.DupKeyCheckMode) error { txn, err := e.Ctx().Txn(true) if err != nil { return err @@ -106,7 +106,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { } // No duplicated rows now, insert the row. - err = e.addRecord(ctx, r.row, table.DupKeyCheckDefault) + err = e.addRecord(ctx, r.row, dupKeyCheck) if err != nil { return err } @@ -180,9 +180,12 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { if e.stats != nil { e.stats.Prefetch = time.Since(prefetchStart) } - e.Ctx().GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows))) + sessionVars := e.Ctx().GetSessionVars() + sessionVars.StmtCtx.AddRecordRows(uint64(len(newRows))) + // TODO: seems we can optimize it to `DupKeyCheckSkip` because all conflict rows are deleted in previous steps. + dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessionVars, txn) for _, r := range toBeCheckedRows { - err = e.replaceRow(ctx, r) + err = e.replaceRow(ctx, r, dupKeyCheck) if err != nil { return err } diff --git a/pkg/executor/update.go b/pkg/executor/update.go index 262dea5ab95be..5d71f5bd8edf4 100644 --- a/pkg/executor/update.go +++ b/pkg/executor/update.go @@ -72,6 +72,8 @@ type UpdateExec struct { fkChecks map[int64][]*FKCheckExec // fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec fkCascades map[int64][]*FKCascadeExec + + IgnoreError bool } // prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations. @@ -172,7 +174,7 @@ func (e *UpdateExec) merge(row, newData []types.Datum, mergeGenerated bool) erro return nil } -func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum) error { +func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newData []types.Datum, dupKeyCheck table.DupKeyCheckMode) error { defer trace.StartRegion(ctx, "UpdateExec").End() bAssignFlag := make([]bool, len(e.assignFlag)) for i, flag := range e.assignFlag { @@ -205,7 +207,7 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat // Update row fkChecks := e.fkChecks[content.TblID] fkCascades := e.fkCascades[content.TblID] - changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades) + changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades, dupKeyCheck) if err1 == nil { _, exist := e.updatedRowKeys[content.Start].Get(handle) memDelta := e.updatedRowKeys[content.Start].Set(handle, changed) @@ -273,6 +275,27 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { } memUsageOfChk := int64(0) totalNumRows := 0 + + txn, err := e.Ctx().Txn(true) + if err != nil { + return 0, err + } + + dupKeyCheck := table.DupKeyCheckInPlace + if (txn.IsPessimistic() && !e.IgnoreError) || txn.IsPipelined() { + // - If `txn.Pipelined()`, it means current is using `@@tidb_dml_type="bulk"` to insert rows. + // `DupKeyCheckLazy` should be used in "bulk" mode to avoid request storage and improve the performance. + // - If `txn.IsPessimistic()`, we can use `DupKeyCheckLazy` to postpone the storage constraints check + // to subsequence stages such as lock. + // One exception is `UPDATE IGNORE ...`, `DupKeyCheckInPlace` should be used to ensure executor can get the + // dup-key error immediately and ignore it then. + // - If the current txn is optimistic, `DupKeyCheckInPlace` is always used + // even if `tidb_constraint_check_in_place` is `OFF`. + // This is because `tidb_constraint_check_in_place` is only designed for insert cases, see comments in issue: + // https://github.com/pingcap/tidb/issues/54492#issuecomment-2229941881 + dupKeyCheck = table.DupKeyCheckLazy + } + for { e.memTracker.Consume(-memUsageOfChk) err := exec.Next(ctx, e.Children(0), chk) @@ -286,14 +309,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { memUsageOfChk = chk.MemoryUsage() e.memTracker.Consume(memUsageOfChk) if e.collectRuntimeStatsEnabled() { - txn, err := e.Ctx().Txn(true) - if err == nil && txn.GetSnapshot() != nil { - txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + if snap := txn.GetSnapshot(); snap != nil { + snap.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } - txn, err := e.Ctx().Txn(true) // pipelined dml may already flush in background, don't touch it to avoid race. - if err == nil && !txn.IsPipelined() { + if !txn.IsPipelined() { sc := e.Ctx().GetSessionVars().StmtCtx txn.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger()) if sc.KvExecCounter != nil { @@ -329,7 +350,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { } } // write to table - if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow); err != nil { + if err := e.exec(ctx, e.Children(0).Schema(), datumRow, newRow, dupKeyCheck); err != nil { return 0, err } } diff --git a/pkg/executor/write.go b/pkg/executor/write.go index 32779ad717a3e..984071c916a72 100644 --- a/pkg/executor/write.go +++ b/pkg/executor/write.go @@ -56,7 +56,7 @@ var ( func updateRecord( ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table, - onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, + onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, dupKeyMode table.DupKeyCheckMode, ) (bool, error) { r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord") defer r.End() @@ -179,7 +179,7 @@ func updateRecord( return false, err } - _, err = t.AddRecord(sctx.GetTableCtx(), newData, table.IsUpdate, table.WithCtx(ctx)) + _, err = t.AddRecord(sctx.GetTableCtx(), newData, table.IsUpdate, table.WithCtx(ctx), dupKeyMode) if err != nil { return false, err } @@ -199,9 +199,9 @@ func updateRecord( // If InHandleForeignKeyTrigger or ForeignKeyTriggerCtx.HasFKCascades is true indicate we may have // foreign key cascade need to handle later, then we still need to write index value, // otherwise, the later foreign cascade executor may see data-index inconsistency in txn-mem-buffer. - opts = []table.UpdateRecordOption{table.WithCtx(ctx)} + opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode} } else { - opts = []table.UpdateRecordOption{table.WithCtx(ctx), table.SkipWriteUntouchedIndices} + opts = []table.UpdateRecordOption{table.WithCtx(ctx), dupKeyMode, table.SkipWriteUntouchedIndices} } // Update record to new value and update index. diff --git a/pkg/planner/core/common_plans.go b/pkg/planner/core/common_plans.go index 4c78407c8363a..8165fee98b229 100644 --- a/pkg/planner/core/common_plans.go +++ b/pkg/planner/core/common_plans.go @@ -371,6 +371,7 @@ type Insert struct { SelectPlan base.PhysicalPlan IsReplace bool + IgnoreErr bool // NeedFillDefaultValue is true when expr in value list reference other column. NeedFillDefaultValue bool @@ -432,6 +433,8 @@ type Update struct { AllAssignmentsAreConstant bool + IgnoreError bool + VirtualAssignmentsOffset int SelectPlan base.PhysicalPlan diff --git a/pkg/planner/core/logical_plan_builder.go b/pkg/planner/core/logical_plan_builder.go index 5b38602437184..c97ded31ea072 100644 --- a/pkg/planner/core/logical_plan_builder.go +++ b/pkg/planner/core/logical_plan_builder.go @@ -5417,6 +5417,7 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( OrderedList: orderedList, AllAssignmentsAreConstant: allAssignmentsAreConstant, VirtualAssignmentsOffset: len(update.List), + IgnoreError: update.IgnoreErr, }.Init(b.ctx) updt.names = p.OutputNames() // We cannot apply projection elimination when building the subplan, because diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index eb88cbbaf1f68..f01b46f74d291 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3701,6 +3701,7 @@ func (b *PlanBuilder) buildInsert(ctx context.Context, insert *ast.InsertStmt) ( tableSchema: schema, tableColNames: names, IsReplace: insert.IsReplace, + IgnoreErr: insert.IgnoreErr, }.Init(b.ctx) if tableInfo.GetPartitionInfo() != nil && len(insert.PartitionNames) != 0 { diff --git a/pkg/planner/core/point_get_plan.go b/pkg/planner/core/point_get_plan.go index 5892910398983..b876de9c99325 100644 --- a/pkg/planner/core/point_get_plan.go +++ b/pkg/planner/core/point_get_plan.go @@ -1950,6 +1950,7 @@ func buildPointUpdatePlan(ctx base.PlanContext, pointPlan base.PhysicalPlan, dbN }, AllAssignmentsAreConstant: allAssignmentsAreConstant, VirtualAssignmentsOffset: len(orderedList), + IgnoreError: updateStmt.IgnoreErr, }.Init(ctx) updatePlan.names = pointPlan.OutputNames() is := ctx.GetInfoSchema().(infoschema.InfoSchema) diff --git a/pkg/sessionctx/variable/BUILD.bazel b/pkg/sessionctx/variable/BUILD.bazel index 53ffdbff2d2b4..900e53d98eaf5 100644 --- a/pkg/sessionctx/variable/BUILD.bazel +++ b/pkg/sessionctx/variable/BUILD.bazel @@ -21,7 +21,6 @@ go_library( deps = [ "//pkg/config", "//pkg/domain/resourcegroup", - "//pkg/errctx", "//pkg/errno", "//pkg/keyspace", "//pkg/kv", diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 8f5b2f795fcf3..b24adf2cc8e87 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain/resourcegroup" - "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser" @@ -1265,9 +1264,6 @@ type SessionVars struct { // EnableGlobalIndex indicates whether we could create an global index on a partition table or not. EnableGlobalIndex bool - // PresumeKeyNotExists indicates lazy existence checking is enabled. - PresumeKeyNotExists bool - // EnableParallelApply indicates that whether to use parallel apply. EnableParallelApply bool @@ -2758,11 +2754,6 @@ func (s *SessionVars) GetDivPrecisionIncrement() int { return s.DivPrecisionIncrement } -// LazyCheckKeyNotExists returns if we can lazy check key not exists. -func (s *SessionVars) LazyCheckKeyNotExists() bool { - return s.PresumeKeyNotExists || (s.TxnCtx != nil && s.TxnCtx.IsPessimistic && s.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) == errctx.LevelError) -} - // GetTemporaryTable returns a TempTable by tableInfo. func (s *SessionVars) GetTemporaryTable(tblInfo *model.TableInfo) tableutil.TempTable { if tblInfo.TempTableType != model.TempTableNone { diff --git a/pkg/table/table.go b/pkg/table/table.go index 98b9c155948db..1f4ea4ce7ee22 100644 --- a/pkg/table/table.go +++ b/pkg/table/table.go @@ -246,14 +246,12 @@ var SkipWriteUntouchedIndices UpdateRecordOption = skipWriteUntouchedIndices{} type DupKeyCheckMode uint8 const ( - // DupKeyCheckDefault indicates using the default behavior. - // Currently, this means to use the return value `ctx.LazyCheckKeyNotExists()`. - // If the above method returns true, it will only check the duplicated key in the memory buffer, - // otherwise, it will also check the duplicated key in the storage. - // TODO: add `DupKeyCheckLazy` to indicate only checking the duplicated key in the memory buffer. - // After `DupKeyCheckLazy` added, `DupKeyCheckDefault` will be renamed to `DupKeyCheckInPlace` to force check - // the duplicated key in place. - DupKeyCheckDefault DupKeyCheckMode = iota + // DupKeyCheckInPlace indicates to check the duplicated key in place, both in the memory buffer and storage. + DupKeyCheckInPlace DupKeyCheckMode = iota + // DupKeyCheckLazy indicates to check the duplicated key lazily. + // It means only checking the duplicated key in the memory buffer and checking keys in storage will be postponed + // to the subsequence stage such as lock or commit phase. + DupKeyCheckLazy // DupKeyCheckSkip indicates skipping the duplicated key check. DupKeyCheckSkip ) diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index a015d5043da7a..577ad2aebc9e2 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -271,8 +271,8 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu return nil, err } } - if !opt.IgnoreAssertion && (!untouched) { - if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { + if !opt.IgnoreAssertion && !untouched { + if opt.DupKeyCheck == table.DupKeyCheckLazy && !txn.IsPessimistic() { err = txn.SetAssertion(key, kv.SetAssertUnknown) } else { err = txn.SetAssertion(key, kv.SetAssertNotExist) @@ -288,7 +288,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu if c.tblInfo.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV value, err = txn.Get(ctx, key) - } else if (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && !keyIsTempIdxKey { + } else if opt.DupKeyCheck == table.DupKeyCheckLazy && !keyIsTempIdxKey { // For temp index keys, we can't get the temp value from memory buffer, even if the lazy check is enabled. // Otherwise, it may cause the temp index value to be overwritten, leading to data inconsistency. value, err = txn.GetMemBuffer().GetLocal(ctx, key) @@ -308,7 +308,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu // The index key value is not found or deleted. if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { val := idxVal - lazyCheck := (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && err != nil + lazyCheck := opt.DupKeyCheck == table.DupKeyCheckLazy && err != nil if keyIsTempIdxKey { tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} val = tempVal.Encode(value) diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index b44056c4b2613..eeacf136b3465 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -896,7 +896,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, r []types.Datum, opt * if t.meta.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV _, err = txn.Get(ctx, key) - } else if sctx.GetSessionVars().LazyCheckKeyNotExists() || txn.IsPipelined() { + } else if opt.DupKeyCheck == table.DupKeyCheckLazy { var v []byte v, err = txn.GetMemBuffer().GetLocal(ctx, key) if err != nil { diff --git a/pkg/table/tables/tables_test.go b/pkg/table/tables/tables_test.go index 3ed13935ed5cb..50f4c785e52c4 100644 --- a/pkg/table/tables/tables_test.go +++ b/pkg/table/tables/tables_test.go @@ -1071,13 +1071,10 @@ func TestDupKeyCheckMode(t *testing.T) { } for _, txnMode := range []string{"optimistic", "pessimistic"} { - t.Run(txnMode+" DupKeyCheckDefault(check in place)", func(t *testing.T) { + t.Run(txnMode+" DupKeyCheckInPlace", func(t *testing.T) { defer tk.MustExec("rollback") memBuffer := prepareTxn(txnMode).GetMemBuffer() oldLen, oldSize := memBuffer.Len(), memBuffer.Size() - tk.Session().GetSessionVars().PresumeKeyNotExists = false - tk.Session().GetSessionVars().StmtCtx.SetErrLevels(errctx.LevelMap{errctx.ErrGroupDupKey: errctx.LevelWarn}) - // AddRecord should check dup key in store and memory buffer for _, row := range [][]types.Datum{ types.MakeDatums(1, 5, 6), @@ -1085,8 +1082,8 @@ func TestDupKeyCheckMode(t *testing.T) { types.MakeDatums(21, 31, 41), types.MakeDatums(200, 22, 23), } { - expectAddRecordDupKeyErr(row, table.DupKeyCheckDefault) - // default mode should be DupKeyCheckDefault + expectAddRecordDupKeyErr(row, table.DupKeyCheckInPlace) + // default mode should be DupKeyCheckInPlace expectAddRecordDupKeyErr(row) require.Equal(t, oldLen, memBuffer.Len()) require.Equal(t, oldSize, memBuffer.Size()) @@ -1096,29 +1093,24 @@ func TestDupKeyCheckMode(t *testing.T) { for _, row := range [][][]types.Datum{ {types.MakeDatums(1, 2, 3), types.MakeDatums(1, 12, 13)}, } { - expectUpdateRecordDupKeyErr(row, []bool{false, true, false}, table.DupKeyCheckDefault) - // default mode should be DupKeyCheckDefault + expectUpdateRecordDupKeyErr(row, []bool{false, true, false}, table.DupKeyCheckInPlace) + // default mode should be DupKeyCheckInPlace expectUpdateRecordDupKeyErr(row, []bool{false, true, false}) require.Equal(t, oldLen, memBuffer.Len()) require.Equal(t, oldSize, memBuffer.Size()) } }) - t.Run(txnMode+" DupKeyCheckDefault(lazy)", func(t *testing.T) { + t.Run(txnMode+" DupKeyCheckLazy", func(t *testing.T) { defer tk.MustExec("rollback") memBuffer := prepareTxn(txnMode).GetMemBuffer() oldLen, oldSize := memBuffer.Len(), memBuffer.Size() - tk.Session().GetSessionVars().PresumeKeyNotExists = true - tk.Session().GetSessionVars().StmtCtx.SetErrLevels(errctx.LevelMap{}) - // AddRecord should check dup key in memory buffer for _, row := range [][]types.Datum{ types.MakeDatums(21, 31, 41), types.MakeDatums(200, 22, 23), } { - expectAddRecordDupKeyErr(row, table.DupKeyCheckDefault) - // if DupKeyCheckMode mode specified, table.DupKeyCheckDefault should be used - expectAddRecordDupKeyErr(row) + expectAddRecordDupKeyErr(row, table.DupKeyCheckLazy) require.Equal(t, oldLen, memBuffer.Len()) require.Equal(t, oldSize, memBuffer.Size()) } @@ -1127,9 +1119,7 @@ func TestDupKeyCheckMode(t *testing.T) { for _, row := range [][][]types.Datum{ {types.MakeDatums(21, 22, 23), types.MakeDatums(21, 32, 35)}, } { - expectUpdateRecordDupKeyErr(row, []bool{false, true, false}, table.DupKeyCheckDefault) - // default mode should be DupKeyCheckDefault - expectUpdateRecordDupKeyErr(row, []bool{false, true, false}) + expectUpdateRecordDupKeyErr(row, []bool{false, true, false}, table.DupKeyCheckLazy) require.Equal(t, oldLen, memBuffer.Len()) require.Equal(t, oldSize, memBuffer.Size()) } @@ -1139,7 +1129,7 @@ func TestDupKeyCheckMode(t *testing.T) { for _, row := range [][]types.Datum{ types.MakeDatums(1, 12, 13), } { - h := expectAddRecordSucc(row, table.DupKeyCheckDefault) + h := expectAddRecordSucc(row, table.DupKeyCheckLazy) // 1 row and 2 indices added require.Equal(t, curLen+3, memBuffer.Len()) curLen = memBuffer.Len() @@ -1158,7 +1148,7 @@ func TestDupKeyCheckMode(t *testing.T) { for _, row := range [][][]types.Datum{ {types.MakeDatums(11, 12, 13), types.MakeDatums(11, 2, 33)}, } { - h := expectUpdateRecordSucc(row, []bool{false, true, true}, table.DupKeyCheckDefault) + h := expectUpdateRecordSucc(row, []bool{false, true, true}, table.DupKeyCheckLazy) // 1 row overridden. 2 indexes deleted and re-added. require.Equal(t, curLen+4, memBuffer.Len()) curLen = memBuffer.Len()