Skip to content

Commit

Permalink
executor: lock duplicated keys on insert-ignore & replace-nothing (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 11, 2023
1 parent cee7565 commit f01eaf1
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 17 deletions.
12 changes: 12 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedRowKey(r.handleKey.newKey)
}
continue
}
} else if !kv.IsErrNotFound(err) {
Expand All @@ -1177,6 +1181,10 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
}
skip = true
break
}
Expand Down Expand Up @@ -1225,6 +1233,10 @@ func (e *InsertValues) removeRow(ctx context.Context, txn kv.Transaction, r toBe
return err
}
if identical {
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return err
}
return nil
}

Expand Down
66 changes: 66 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,3 +1481,69 @@ func TestIssue32213(t *testing.T) {
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999"))
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
}
}
4 changes: 4 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle
}
if rowUnchanged {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return false, err
}
return true, nil
}

Expand Down
36 changes: 20 additions & 16 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, nil
_, err := appendUnchangedRowForLock(sctx, t, h, oldData)
return false, err
}

// Fill values into on-update-now fields, only if they are really changed.
Expand Down Expand Up @@ -231,6 +217,24 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
return true, nil
}

func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return false, nil
}
physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, row)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx.AddUnchangedRowKey(unchangedRowKey)
return true, nil
}

func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error {
tableInfo := t.Meta()
if !tableInfo.ContainsAutoRandomBits() {
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func TestOptimisticConflicts(t *testing.T) {
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")
tk.MustExec("select * from conflict where id in (1, 2, 3)")

tk2.MustExec("update conflict set c = c - 1")

Expand Down

0 comments on commit f01eaf1

Please sign in to comment.