Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: lock duplicated keys on insert-ignore & replace-nothing (#42210) #42287

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,22 +1087,76 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if r.handleKey != nil {
_, err := txn.Get(ctx, r.handleKey.newKey)
if err == nil {
<<<<<<< HEAD
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
}
if !kv.IsErrNotFound(err) {
=======
if replace {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}
_, err2 := e.removeRow(ctx, txn, handle, r, false)
if err2 != nil {
return err2
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedRowKey(r.handleKey.newKey)
}
continue
}
} else if !kv.IsErrNotFound(err) {
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
return err
}
}
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
if err == nil {
<<<<<<< HEAD
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
skip = true
break
}
if !kv.IsErrNotFound(err) {
=======
if replace {
_, handle, err := tables.FetchDuplicatedHandle(
ctx,
uk.newKey,
true,
txn,
e.Table.Meta().ID,
uk.commonHandle,
)
if err != nil {
return err
}
if handle == nil {
continue
}
_, err = e.removeRow(ctx, txn, handle, r, true)
if err != nil {
return err
}
} else {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
}
skip = true
break
}
} else if !kv.IsErrNotFound(err) {
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
return err
}
}
Expand All @@ -1124,6 +1178,80 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

<<<<<<< HEAD
=======
// removeRow removes the duplicate row and cleanup its keys in the key-value map.
// But if the to-be-removed row equals to the to-be-added row, no remove or add
// things to do and return (true, nil).
func (e *InsertValues) removeRow(
ctx context.Context,
txn kv.Transaction,
handle kv.Handle,
r toBeCheckedRow,
inReplace bool,
) (bool, error) {
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
zap.String("handle", handle.String()),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
if kv.IsErrNotFound(err) {
err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %s", handle)
}
return false, err
}

identical, err := e.equalDatumsAsBinary(oldRow, newRow)
if err != nil {
return false, err
}
if identical {
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return false, err
}
return true, nil
}

err = r.t.RemoveRecord(e.ctx, handle, oldRow)
if err != nil {
return false, err
}
err = onRemoveRowForFK(e.ctx, oldRow, e.fkChecks, e.fkCascades)
if err != nil {
return false, err
}
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
} else {
e.ctx.GetSessionVars().StmtCtx.AddDeletedRows(1)
}

return false, nil
}

// equalDatumsAsBinary compare if a and b contains the same datum values in binary collation.
func (e *InsertValues) equalDatumsAsBinary(a []types.Datum, b []types.Datum) (bool, error) {
if len(a) != len(b) {
return false, nil
}
for i, ai := range a {
v, err := ai.Compare(e.ctx.GetSessionVars().StmtCtx, &b[i], collate.GetBinaryCollator())
if err != nil {
return false, errors.Trace(err)
}
if v != 0 {
return false, nil
}
}
return true, nil
}

>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}
Expand Down
103 changes: 103 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,3 +1852,106 @@ func (s *testSuite13) TestReplaceAllocatingAutoID(c *C) {
// Note that this error is different from MySQL's duplicated primary key error.
tk.MustGetErrCode("REPLACE INTO t1 VALUES (0,'newmaxvalue');", errno.ErrAutoincReadFailed)
}
<<<<<<< HEAD
=======

func TestInsertIntoSelectError(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("DROP TABLE IF EXISTS t1;")
tk.MustExec("CREATE TABLE t1(a INT) ENGINE = InnoDB;")
tk.MustExec("INSERT IGNORE into t1(SELECT SLEEP(NULL));")
tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows("Warning 1210 Incorrect arguments to sleep"))
tk.MustExec("INSERT IGNORE into t1(SELECT SLEEP(-1));")
tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows("Warning 1210 Incorrect arguments to sleep"))
tk.MustExec("INSERT IGNORE into t1(SELECT SLEEP(1));")
tk.MustQuery("SELECT * FROM t1;").Check(testkit.Rows("0", "0", "0"))
tk.MustExec("DROP TABLE t1;")
}

// https://github.com/pingcap/tidb/issues/32213.
func TestIssue32213(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)

tk.MustExec("create table test.t1(c1 float)")
tk.MustExec("insert into test.t1 values(999.99)")
tk.MustQuery("select cast(test.t1.c1 as decimal(4, 1)) from test.t1").Check(testkit.Rows("999.9"))
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 1)) from test.t1").Check(testkit.Rows("1000.0"))

tk.MustExec("drop table if exists test.t1")
tk.MustExec("create table test.t1(c1 decimal(6, 4))")
tk.MustExec("insert into test.t1 values(99.9999)")
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999"))
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
}
}
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
36 changes: 20 additions & 16 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, nil
_, err := appendUnchangedRowForLock(sctx, t, h, oldData)
return false, err
}

// 4. Fill values into on-update-now fields, only if they are really changed.
Expand Down Expand Up @@ -222,6 +208,24 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
return true, nil
}

func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return false, nil
}
physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, row)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx.AddUnchangedRowKey(unchangedRowKey)
return true, nil
}

func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error {
tableInfo := t.Meta()
if !tableInfo.ContainsAutoRandomBits() {
Expand Down
2 changes: 1 addition & 1 deletion session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")
tk.MustExec("select * from conflict where id in (1, 2, 3)")

tk2.MustExec("update conflict set c = c - 1")

Expand Down