Skip to content

Commit

Permalink
executor: ignore foreign key error in UPDATE/INSERT/DELETE ignore (#…
Browse files Browse the repository at this point in the history
…56682)

close #39712, close #56678, close #56681
  • Loading branch information
YangKeao authored Oct 28, 2024
1 parent 5ab6738 commit 2de388b
Show file tree
Hide file tree
Showing 17 changed files with 288 additions and 24 deletions.
3 changes: 2 additions & 1 deletion pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor {
hasRefCols: v.NeedFillDefaultValue,
SelectExec: selectExec,
rowLen: v.RowLen,
ignoreErr: v.IgnoreErr,
}
err := ivs.initInsertColumns()
if err != nil {
Expand All @@ -995,7 +996,6 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor {
insert := &InsertExec{
InsertValues: ivs,
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
IgnoreErr: v.IgnoreErr,
}
return insert
}
Expand Down Expand Up @@ -2702,6 +2702,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) exec.Executor {
tblID2Table: tblID2table,
IsMultiTable: v.IsMultiTable,
tblColPosInfos: v.TblColPosInfos,
ignoreErr: v.IgnoreErr,
}
deleteExec.fkChecks, b.err = buildTblID2FKCheckExecs(b.ctx, tblID2table, v.FKChecks)
if b.err != nil {
Expand Down
46 changes: 38 additions & 8 deletions pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type DeleteExec struct {
fkChecks map[int64][]*FKCheckExec
// fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec
fkCascades map[int64][]*FKCascadeExec

ignoreErr bool
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -100,6 +102,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
return errors.New("schema columns and fields mismatch")
}
memUsageOfChk := int64(0)

for {
e.memTracker.Consume(-memUsageOfChk)
iter := chunk.NewIterator4Chunk(chk)
Expand Down Expand Up @@ -128,6 +131,18 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
datumRow = append(datumRow, datum)
}

if e.ignoreErr {
ignored, err := checkFKIgnoreErr(ctx, e.Ctx(), e.fkChecks[tbl.Meta().ID], datumRow)
if err != nil {
return err
}

// meets an error, skip this row.
if ignored {
datumRow = datumRow[:0]
continue
}
}
err = e.deleteOneRow(tbl, colPosInfo, isExtraHandle, datumRow)
if err != nil {
return err
Expand Down Expand Up @@ -223,13 +238,26 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
}
}
}
return e.removeRowsInTblRowMap(tblRowMap)
return e.removeRowsInTblRowMap(ctx, tblRowMap)
}

func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error {
func (e *DeleteExec) removeRowsInTblRowMap(ctx context.Context, tblRowMap tableRowMapType) error {
for id, rowMap := range tblRowMap {
var err error
rowMap.Range(func(h kv.Handle, val handleInfoPair) bool {
if e.ignoreErr {
var ignored bool
ignored, err = checkFKIgnoreErr(ctx, e.Ctx(), e.fkChecks[id], val.handleVal)
if err != nil {
return false
}

// meets an error, skip this row.
if ignored {
return true
}
}

err = e.removeRow(e.Ctx(), e.tblID2Table[id], h, val.handleVal, val.posInfo)
return err == nil
})
Expand All @@ -251,20 +279,22 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
return err
}
tid := t.Meta().ID
err = onRemoveRowForFK(ctx, data, e.fkChecks[tid], e.fkCascades[tid])
err = onRemoveRowForFK(ctx, data, e.fkChecks[tid], e.fkCascades[tid], e.ignoreErr)
if err != nil {
return err
}
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return nil
}

func onRemoveRowForFK(ctx sessionctx.Context, data []types.Datum, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec) error {
func onRemoveRowForFK(ctx sessionctx.Context, data []types.Datum, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, ignore bool) error {
sc := ctx.GetSessionVars().StmtCtx
for _, fkc := range fkChecks {
err := fkc.deleteRowNeedToCheck(sc, data)
if err != nil {
return err
if !ignore {
for _, fkc := range fkChecks {
err := fkc.deleteRowNeedToCheck(sc, data)
if err != nil {
return err
}
}
}
for _, fkc := range fkCascades {
Expand Down
45 changes: 45 additions & 0 deletions pkg/executor/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func TestDeleteLockKey(t *testing.T) {
Expand Down Expand Up @@ -106,3 +107,47 @@ func TestDeleteLockKey(t *testing.T) {
}
wg.Wait()
}

func TestDeleteIgnoreWithFK(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table parent (a int primary key)")
tk.MustExec("create table child (a int, foreign key (a) references parent(a))")

tk.MustExec("insert into parent values (1), (2)")
tk.MustExec("insert into child values (1)")

// Delete the row in parent table will fail
require.NotNil(t, tk.ExecToErr("delete from parent where a = 1"))

// Delete ignore will return no error
tk.MustExec("delete ignore from parent where a = 1")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1451 Cannot delete or update a parent row: a foreign key constraint fails (`test`.`child`, CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `parent` (`a`))"))

// Other rows will be deleted successfully
tk.MustExec("delete ignore from parent")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1451 Cannot delete or update a parent row: a foreign key constraint fails (`test`.`child`, CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `parent` (`a`))"))
tk.MustQuery("select * from parent").Check(testkit.Rows("1"))

tk.MustExec("insert into parent values (2)")
// Delete multiple tables
tk.MustExec("create table parent2 (a int primary key)")
tk.MustExec("create table child2 (a int, foreign key (a) references parent2(a))")
tk.MustExec("insert into parent2 values (1), (2)")
tk.MustExec("insert into child2 values (1)")
require.NotNil(t, tk.ExecToErr("delete from parent, parent2 using parent inner join parent2 where parent.a = parent2.a"))
tk.MustExec("delete ignore from parent, parent2 using parent inner join parent2 where parent.a = parent2.a")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1451 Cannot delete or update a parent row: a foreign key constraint fails (`test`.`child`, CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `parent` (`a`))",
"Warning 1451 Cannot delete or update a parent row: a foreign key constraint fails (`test`.`child2`, CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `parent2` (`a`))"))
tk.MustQuery("select * from parent").Check(testkit.Rows("1"))
tk.MustQuery("select * from parent2").Check(testkit.Rows("1"))

// Test batch on delete
require.NotNil(t, tk.ExecToErr("batch on `a` limit 1000 delete from parent where a = 1"))
tk.MustExec("batch on `a` limit 1000 delete ignore from parent where a = 1")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1451 Cannot delete or update a parent row: a foreign key constraint fails (`test`.`child`, CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `parent` (`a`))"))
}
24 changes: 24 additions & 0 deletions pkg/executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,30 @@ func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementConte
return nil
}

// checkFKIgnoreErr will use `fkc.checkRows` to check the rows. The `fkc.checkRows` will ignore the error and append the error as warning to the statement context.
// It'll return whether an error has been ignored. If an error has been ignored, it'll return `true, nil`.
func checkFKIgnoreErr(ctx context.Context, sctx sessionctx.Context, fkChecks []*FKCheckExec, row []types.Datum) (bool, error) {
txn, err := sctx.Txn(true)
if err != nil {
return false, err
}

fkToBeCheckedRows := [1]toBeCheckedRow{{row: row, ignored: false}}

for _, fkc := range fkChecks {
err := fkc.checkRows(ctx, sctx.GetSessionVars().StmtCtx, txn, fkToBeCheckedRows[:])
if err != nil {
return false, err
}
}

if fkToBeCheckedRows[0].ignored {
return true, nil
}

return false, nil
}

func (b *executorBuilder) buildTblID2FKCascadeExecs(tblID2Table map[int64]table.Table, tblID2FKCascades map[int64][]*plannercore.FKCascade) (map[int64][]*FKCascadeExec, error) {
fkCascadesMap := make(map[int64][]*FKCascadeExec)
for tid, tbl := range tblID2Table {
Expand Down
20 changes: 15 additions & 5 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
type InsertExec struct {
*InsertValues
OnDuplicate []*expression.Assignment
IgnoreErr bool
evalBuffer4Dup chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum
Expand Down Expand Up @@ -92,7 +91,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if err != nil {
return err
}
} else if e.IgnoreErr {
} else if e.ignoreErr {
err := e.batchCheckAndInsert(ctx, rows, e.addRecord, false)
if err != nil {
return err
Expand Down Expand Up @@ -242,7 +241,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// Though it is in an insert statement, `ON DUP KEY UPDATE` follows the dup-key check behavior of update.
// For example, it will ignore variable `tidb_constraint_check_in_place`, see the test case:
// https://github.com/pingcap/tidb/blob/3117d3fae50bbb5dabcde7b9589f92bfbbda5dc6/pkg/executor/test/writetest/write_test.go#L419-L426
updateDupKeyCheck := optimizeDupKeyCheckForUpdate(txn, e.IgnoreErr)
updateDupKeyCheck := optimizeDupKeyCheckForUpdate(txn, e.ignoreErr)
// Do not use `updateDupKeyCheck` for `AddRecord` because it is not optimized for insert.
// It seems that we can just use `DupKeyCheckSkip` here because all constraints are checked.
// But we still use `optimizeDupKeyCheckForNormalInsert` to make the refactor same behavior with the original code.
Expand Down Expand Up @@ -479,7 +478,18 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode)
if e.ignoreErr {
ignored, err := checkFKIgnoreErr(ctx, e.Ctx(), e.fkChecks, newData)
if err != nil {
return err
}

// meets an error, skip this row.
if ignored {
return nil
}
}
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode, e.ignoreErr)
if err != nil {
return err
}
Expand All @@ -504,7 +514,7 @@ func (e *InsertExec) setMessage() {
if e.SelectExec != nil || numRecords > 1 {
numWarnings := stmtCtx.WarningCount()
var numDuplicates uint64
if e.IgnoreErr {
if e.ignoreErr {
// if ignoreErr
numDuplicates = numRecords - stmtCtx.CopiedRows()
} else {
Expand Down
4 changes: 3 additions & 1 deletion pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type InsertValues struct {
// fkChecks contains the foreign key checkers.
fkChecks []*FKCheckExec
fkCascades []*FKCascadeExec

ignoreErr bool
}

type defaultVal struct {
Expand Down Expand Up @@ -1370,7 +1372,7 @@ func (e *InsertValues) removeRow(
if err != nil {
return false, err
}
err = onRemoveRowForFK(e.Ctx(), oldRow, e.fkChecks, e.fkCascades)
err = onRemoveRowForFK(e.Ctx(), oldRow, e.fkChecks, e.fkCascades, e.ignoreErr)
if err != nil {
return false, err
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,19 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat
// Update row
fkChecks := e.fkChecks[content.TblID]
fkCascades := e.fkCascades[content.TblID]
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades, dupKeyCheck)

if e.IgnoreError {
ignored, err := checkFKIgnoreErr(ctx, e.Ctx(), e.fkChecks[tbl.Meta().ID], newTableData)
if err != nil {
return err
}

// meets an error, skip this row.
if ignored {
continue
}
}
changed, err1 := updateRecord(ctx, e.Ctx(), handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks, fkCascades, dupKeyCheck, e.IgnoreError)
if err1 == nil {
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
Expand Down
12 changes: 7 additions & 5 deletions pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var (
func updateRecord(
ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool,
t table.Table,
onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, dupKeyMode table.DupKeyCheckMode,
onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, dupKeyMode table.DupKeyCheckMode, ignoreErr bool,
) (bool, error) {
r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord")
defer r.End()
Expand Down Expand Up @@ -221,10 +221,12 @@ func updateRecord(
}
}
}
for _, fkt := range fkChecks {
err := fkt.updateRowNeedToCheck(sc, oldData, newData)
if err != nil {
return false, err
if !ignoreErr {
for _, fkt := range fkChecks {
err := fkt.updateRowNeedToCheck(sc, oldData, newData)
if err != nil {
return false, err
}
}
}
for _, fkc := range fkCascades {
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ type Delete struct {

FKChecks map[int64][]*FKCheck `plan-cache-clone:"must-nil"`
FKCascades map[int64][]*FKCascade `plan-cache-clone:"must-nil"`

IgnoreErr bool
}

// MemoryUsage return the memory usage of Delete
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5945,6 +5945,7 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, ds *ast.DeleteStmt) (base

del := Delete{
IsMultiTable: ds.IsMultiTable,
IgnoreErr: ds.IgnoreErr,
}.Init(b.ctx)

localResolveCtx := resolve.NewContext()
Expand Down
7 changes: 4 additions & 3 deletions pkg/planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2040,18 +2040,18 @@ func tryDeletePointPlan(ctx base.PlanContext, delStmt *ast.DeleteStmt, resolveCt
if ctx.GetSessionVars().TxnCtx.IsPessimistic {
pointGet.Lock, pointGet.LockWaitTime = getLockWaitTime(ctx, &ast.SelectLockInfo{LockType: ast.SelectLockForUpdate})
}
return buildPointDeletePlan(ctx, pointGet, pointGet.dbName, pointGet.TblInfo)
return buildPointDeletePlan(ctx, pointGet, pointGet.dbName, pointGet.TblInfo, delStmt.IgnoreErr)
}
if batchPointGet := tryWhereIn2BatchPointGet(ctx, selStmt, resolveCtx); batchPointGet != nil {
if ctx.GetSessionVars().TxnCtx.IsPessimistic {
batchPointGet.Lock, batchPointGet.LockWaitTime = getLockWaitTime(ctx, &ast.SelectLockInfo{LockType: ast.SelectLockForUpdate})
}
return buildPointDeletePlan(ctx, batchPointGet, batchPointGet.dbName, batchPointGet.TblInfo)
return buildPointDeletePlan(ctx, batchPointGet, batchPointGet.dbName, batchPointGet.TblInfo, delStmt.IgnoreErr)
}
return nil
}

func buildPointDeletePlan(ctx base.PlanContext, pointPlan base.PhysicalPlan, dbName string, tbl *model.TableInfo) base.Plan {
func buildPointDeletePlan(ctx base.PlanContext, pointPlan base.PhysicalPlan, dbName string, tbl *model.TableInfo, ignoreErr bool) base.Plan {
if checkFastPlanPrivilege(ctx, dbName, tbl.Name.L, mysql.SelectPriv, mysql.DeletePriv) != nil {
return nil
}
Expand All @@ -2071,6 +2071,7 @@ func buildPointDeletePlan(ctx base.PlanContext, pointPlan base.PhysicalPlan, dbN
delPlan := Delete{
SelectPlan: pointPlan,
TblColPosInfos: []TblColPosInfo{colPosInfo},
IgnoreErr: ignoreErr,
}.Init(ctx)
tblID2Table := map[int64]table.Table{tbl.ID: t}
err = delPlan.buildOnDeleteFKTriggers(ctx, is, tblID2Table)
Expand Down
Loading

0 comments on commit 2de388b

Please sign in to comment.