Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix mysql_insert_id() for "INSERT .. ON DUPLICATE KEY" statement (#56514) #58120

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, autoColIdx int) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand All @@ -197,7 +197,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch]
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch)
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, autoColIdx)
if e.Ctx().GetSessionVars().StmtCtx.DupKeyAsWarning && (kv.ErrKeyExists.Equal(err) ||
table.ErrCheckConstraintViolated.Equal(err)) {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err)
Expand Down Expand Up @@ -236,14 +236,19 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
e.stats.Prefetch += time.Since(prefetchStart)
}

_, autoColIdx, found := findAutoIncrementColumn(e.Table)
if !found {
autoColIdx = -1
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, autoColIdx)
if err == nil {
continue
}
Expand All @@ -260,7 +265,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if handle == nil {
continue
}
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, autoColIdx)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down Expand Up @@ -381,7 +386,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int) error {
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, autoColIdx int) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand Down Expand Up @@ -430,6 +435,17 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
if err != nil {
return err
}

if autoColIdx >= 0 {
if e.Ctx().GetSessionVars().StmtCtx.AffectedRows() > 0 {
// If "INSERT ... ON DUPLICATE KEY UPDATE" duplicate and update a row,
// auto increment value should be set correctly for mysql_insert_id()
// See https://github.com/pingcap/tidb/issues/55965
e.Ctx().GetSessionVars().StmtCtx.InsertID = newData[autoColIdx].GetUint64()
} else {
e.Ctx().GetSessionVars().StmtCtx.InsertID = 0
}
}
return nil
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,3 +1628,68 @@ func TestUnsignedDecimalFloatInsertNegative(t *testing.T) {
tk.MustExec("insert into tf values('-100')")
tk.MustQuery("select * from tf").Check(testkit.Rows("0"))
}

func TestMySQLInsertID(t *testing.T) {
// mysql_insert_id() differs from LAST_INSERT_ID()
// See https://github.com/pingcap/tidb/issues/55965
// mysql_insert_id() is got from tk.Session().LastInsertID()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists tb")
tk.MustExec("create table tb(pk int primary key auto_increment, a int, b int, unique(a))")
defer tk.MustExec("drop table if exists tb")

tk.MustExec("insert into tb (a, b) values (1, 1) on duplicate key update b = values(b)")
require.Equal(t, tk.Session().LastInsertID(), uint64(1))

tk.MustExec("insert into tb (a, b) values (2, 2) on duplicate key update b = values(b)")
require.Equal(t, tk.Session().LastInsertID(), uint64(2))

// If there is an AUTO_INCREMENT column in the table and there were some explicit successfully
// inserted values or some updated values, return the last of the inserted or updated values.
// Ref https://dev.mysql.com/doc/c-api/5.7/en/mysql-insert-id.html#:~:text=When%20called%20after%20an%20INSERT%20...%20ON,of%20the%20inserted%20or%20updated%20values
tk.MustExec("insert into tb (a, b) values (1, 2) on duplicate key update b = values(b)")
require.Equal(t, tk.Session().LastInsertID(), uint64(1))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

tk.MustQuery("select * from tb").Sort().Check(testkit.Rows("1 1 2", "2 2 2"))

// When the new row and the old row are exactly the same (no inserted or updated values), mysql_insert_id() is 0
tk.MustExec("insert into tb (a, b) values (1, 2) on duplicate key update b = 2")
require.Equal(t, tk.Session().LastInsertID(), uint64(0))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

// When the value of auto increment column is assigned explicitly, LAST_INSERT_ID() is unchanged.
// mysql_insert_id() is set to the explicit assigned value.
tk.MustExec("insert into tb values (6, 6, 6)")
require.Equal(t, tk.Session().LastInsertID(), uint64(6))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

// Update statement touches neigher mysql_insert_id() nor LAST_INSERT_ID()
tk.MustExec("update tb set b = 7, pk = pk + 1 where b = 6")
require.Equal(t, tk.Session().LastInsertID(), uint64(0))
tk.MustQuery("select LAST_INSERT_ID()").Check(testkit.Rows("2"))

// How to distinguish LAST_INSERT_ID() and mysql_insert_id()?
// In a word, LAST_INSERT_ID() is always get from auto allocated value, while mysql_insert_id() can be
// auto allocated or explicited specified.

// Another scenario mentioned by @lcwangcao
// What's the behaviour when transaction conflict involved?
tk.MustExec("truncate table tb")
tk.MustExec("insert into tb (a, b) values (1, 1), (2, 2)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("begin")
tk1.MustExec("update tb set b = 2 where a = 1")
go func() {
time.Sleep(100 * time.Millisecond)
tk1.MustExec("commit")
}()
// The first time this will update one row.
// Then transaction conflict and retry, in the second time it modify nothing.
tk.MustExec("insert into tb(a, b) values(1,2) on duplicate key update b = 2;")
require.Equal(t, tk.Session().LastInsertID(), uint64(0))
}