Skip to content

Commit

Permalink
txn: add a variable to control whether to lock unchanged unique keys (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium authored Jun 26, 2023
1 parent 3de4c12 commit 3c45737
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 204 deletions.
77 changes: 57 additions & 20 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ func (e *InsertValues) initInsertColumns() error {
}
cols, missingColIdx = table.FindColumns(tableCols, columns, e.Table.Meta().PKIsHandle)
if missingColIdx >= 0 {
return errors.Errorf("INSERT INTO %s: unknown column %s",
e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O)
return errors.Errorf(
"INSERT INTO %s: unknown column %s",
e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O,
)
}
} else {
// If e.Columns are empty, use all columns instead.
Expand Down Expand Up @@ -375,7 +377,9 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression

var emptyRow chunk.Row

func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) (
[]types.Datum, error,
) {
rowLen := len(e.Table.Cols())
if e.hasExtraHandle {
rowLen++
Expand Down Expand Up @@ -652,7 +656,9 @@ func (e *InsertValues) fillColValue(
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) (
[]types.Datum, error,
) {
gCols := make([]*table.Column, 0)
tCols := e.Table.Cols()
if e.hasExtraHandle {
Expand Down Expand Up @@ -695,7 +701,12 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefID)
err := p.CheckForExchangePartition(
e.ctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -808,7 +819,9 @@ func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) {
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) (
[][]types.Datum, error,
) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
Expand Down Expand Up @@ -900,7 +913,9 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustAutoIncrementDatum(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, ok := retryInfo.GetCurrAutoIncrementID()
Expand Down Expand Up @@ -976,7 +991,9 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
return recordID, nil
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustAutoRandomDatum(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, ok := retryInfo.GetCurrAutoRandomID()
Expand Down Expand Up @@ -1075,7 +1092,9 @@ func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, f
return alloc.Rebase(ctx, autoRandomID, true)
}

func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustImplicitRowID(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
var err error
var recordID int64
if !hasValue {
Expand Down Expand Up @@ -1123,7 +1142,11 @@ func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64)
alloc := e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType)
tableInfo := e.Table.Meta()

shardFmt := autoid.NewShardIDFormat(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits, autoid.RowIDBitLength)
shardFmt := autoid.NewShardIDFormat(
types.NewFieldType(mysql.TypeLonglong),
tableInfo.ShardRowIDBits,
autoid.RowIDBitLength,
)
newTiDBRowIDBase := shardFmt.IncrementalMask() & recordID

return alloc.Rebase(ctx, newTiDBRowIDBase, true)
Expand Down Expand Up @@ -1151,9 +1174,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum,
func (e *InsertValues) batchCheckAndInsert(
ctx context.Context, rows [][]types.Datum,
addRecord func(ctx context.Context, row []types.Datum) error,
replace bool) error {
replace bool,
) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
Expand Down Expand Up @@ -1207,7 +1232,8 @@ CheckAndInsert:
if err == nil {
if !replace {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.ctx.GetSessionVars().LockUnchangedKeys {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedKeyForLock(r.handleKey.newKey)
}
Expand Down Expand Up @@ -1238,7 +1264,8 @@ CheckAndInsert:
if !replace {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.ctx.GetSessionVars().LockUnchangedKeys {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedKeyForLock(uk.newKey)
}
Expand Down Expand Up @@ -1302,9 +1329,11 @@ func (e *InsertValues) removeRow(
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
logutil.BgLogger().Error(
"get old row failed when replace",
zap.String("handle", handle.String()),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)),
)
if kv.IsErrNotFound(err) {
err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %s", handle)
}
Expand All @@ -1319,7 +1348,11 @@ func (e *InsertValues) removeRow(
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}
if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, lockRowKey|lockUniqueKeys); err != nil {
keySet := lockRowKey
if e.ctx.GetSessionVars().LockUnchangedKeys {
keySet |= lockUniqueKeys
}
if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, keySet); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -1363,7 +1396,9 @@ func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}

func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.Datum, reserveAutoIDCount int) (err error) {
func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int,
) (err error) {
vars := e.ctx.GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
Expand Down Expand Up @@ -1443,10 +1478,12 @@ func (e *InsertRuntimeStat) String() string {
buf.WriteString(", ")
}
if e.Prefetch > 0 {
fmt.Fprintf(buf, "check_insert: {total_time: %v, mem_insert_time: %v, prefetch: %v",
fmt.Fprintf(
buf, "check_insert: {total_time: %v, mem_insert_time: %v, prefetch: %v",
execdetails.FormatDuration(e.CheckInsertTime),
execdetails.FormatDuration(e.CheckInsertTime-e.Prefetch),
execdetails.FormatDuration(e.Prefetch))
execdetails.FormatDuration(e.Prefetch),
)
if e.FKCheckTime > 0 {
fmt.Fprintf(buf, ", fk_check: %v", execdetails.FormatDuration(e.FKCheckTime))
}
Expand Down
132 changes: 75 additions & 57 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,68 +1510,86 @@ func TestIssue32213(t *testing.T) {
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
func TestInsertLockUnchangedKeys(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
for _, shouldLock := range []bool{false} {
for _, tt := range []struct {
name string
ddl string
dml string
isClusteredPK bool
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
true,
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
false,
},
{
"insert-ignore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
true,
},
{
"insert-ignore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
false,
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
true,
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
false,
},
} {
t.Run(
tt.name+"-"+strconv.FormatBool(shouldLock), func(t *testing.T) {
tk1.MustExec(fmt.Sprintf("set @@tidb_lock_unchanged_keys = %v", shouldLock))
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
errCh := make(chan error)
go func() {
_, err := tk2.Exec("insert into t values (1)")
errCh <- err
}()
select {
case <-errCh:
if shouldLock {
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
}
close(errCh)
case <-time.After(200 * time.Millisecond):
if !shouldLock && !tt.isClusteredPK {
require.Failf(t, "txn2 is blocked by %q", tt.dml)
}
}
tk1.MustExec("commit")
<-errCh
tk1.MustQuery("select * from t").Check(testkit.Rows("1"))
},
)
}
}
}
Loading

0 comments on commit 3c45737

Please sign in to comment.