Skip to content

Commit

Permalink
executor: fix mysql_insert_id() for "INSERT .. ON DUPLICATE KEY" stat…
Browse files Browse the repository at this point in the history
…ement (#56514)

close #55965
  • Loading branch information
tiancaiamao authored Oct 11, 2024
1 parent 1946f92 commit 3dfc47f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 5 deletions.
26 changes: 21 additions & 5 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode) error {
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode, autoColIdx int) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand All @@ -205,7 +205,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch]
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck)
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck, autoColIdx)
if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err)
Expand Down Expand Up @@ -249,14 +249,19 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// TODO: just use `DupKeyCheckSkip` here.
addRecordDupKeyCheck := optimizeDupKeyCheckForNormalInsert(e.Ctx().GetSessionVars(), txn)

_, autoColIdx, found := findAutoIncrementColumn(e.Table)
if !found {
autoColIdx = -1
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck, autoColIdx)
if err == nil {
continue
}
Expand All @@ -273,7 +278,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if handle == nil {
continue
}
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck, autoColIdx)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down Expand Up @@ -427,7 +432,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode) error {
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode, autoColIdx int) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand Down Expand Up @@ -478,6 +483,17 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
if err != nil {
return err
}

if autoColIdx >= 0 {
if e.Ctx().GetSessionVars().StmtCtx.AffectedRows() > 0 {
// If "INSERT ... ON DUPLICATE KEY UPDATE" duplicate and update a row,
// auto increment value should be set correctly for mysql_insert_id()
// See https://github.com/pingcap/tidb/issues/55965
e.Ctx().GetSessionVars().StmtCtx.InsertID = newData[autoColIdx].GetUint64()
} else {
e.Ctx().GetSessionVars().StmtCtx.InsertID = 0
}
}
return nil
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,68 @@ func TestInsertLockUnchangedKeys(t *testing.T) {
}
}
}

func TestMySQLInsertID(t *testing.T) {
// mysql_insert_id() differs from LAST_INSERT_ID()
// See https://github.com/pingcap/tidb/issues/55965
// mysql_insert_id() is got from tk.Session().LastInsertID()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists tb")
tk.MustExec("create table tb(pk int primary key auto_increment, a int, b int, unique(a))")
defer tk.MustExec("drop table if exists tb")

tk.MustExec("insert into tb (a, b) values (1, 1) on duplicate key update b = values(b)")
require.Equal(t, tk.Session().LastInsertID(), uint64(1))

tk.MustExec("insert into tb (a, b) values (2, 2) on duplicate key update b = values(b)")
require.Equal(t, tk.Session().LastInsertID(), uint64(2))

// If there is an AUTO_INCREMENT column in the table and there were some explicit successfully
// inserted values or some updated values, return the last of the inserted or updated values.
// Ref https://dev.mysql.com/doc/c-api/5.7/en/mysql-insert-id.html#:~:text=When%20called%20after%20an%20INSERT%20...%20ON,of%20the%20inserted%20or%20updated%20values
tk.MustExec("insert into tb (a, b) values (1, 2) on duplicate key update b = values(b)")
require.Equal(t, tk.Session().LastInsertID(), uint64(1))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

tk.MustQuery("select * from tb").Sort().Check(testkit.Rows("1 1 2", "2 2 2"))

// When the new row and the old row are exactly the same (no inserted or updated values), mysql_insert_id() is 0
tk.MustExec("insert into tb (a, b) values (1, 2) on duplicate key update b = 2")
require.Equal(t, tk.Session().LastInsertID(), uint64(0))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

// When the value of auto increment column is assigned explicitly, LAST_INSERT_ID() is unchanged.
// mysql_insert_id() is set to the explicit assigned value.
tk.MustExec("insert into tb values (6, 6, 6)")
require.Equal(t, tk.Session().LastInsertID(), uint64(6))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

// Update statement touches neigher mysql_insert_id() nor LAST_INSERT_ID()
tk.MustExec("update tb set b = 7, pk = pk + 1 where b = 6")
require.Equal(t, tk.Session().LastInsertID(), uint64(0))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

// How to distinguish LAST_INSERT_ID() and mysql_insert_id()?
// In a word, LAST_INSERT_ID() is always get from auto allocated value, while mysql_insert_id() can be
// auto allocated or explicited specified.

// Another scenario mentioned by @lcwangcao
// What's the behaviour when transaction conflict involved?
tk.MustExec("truncate table tb")
tk.MustExec("insert into tb (a, b) values (1, 1), (2, 2)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("begin")
tk1.MustExec("update tb set b = 2 where a = 1")
go func() {
time.Sleep(100 * time.Millisecond)
tk1.MustExec("commit")
}()
// The first time this will update one row.
// Then transaction conflict and retry, in the second time it modify nothing.
tk.MustExec("insert into tb(a, b) values(1,2) on duplicate key update b = 2;")
require.Equal(t, tk.Session().LastInsertID(), uint64(0))
}

0 comments on commit 3dfc47f

Please sign in to comment.