Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix auto-id allocation during statements retry (#20659) #21079

Merged
merged 6 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1401,11 +1401,6 @@ error = '''
The isolation level '%s' is not supported. Set tidb_skip_isolation_level_check=1 to skip this error
'''

["variable:8054"]
error = '''
cannot set variable to null
'''

["variable:8055"]
error = '''
snapshot is older than GC safe point %s
Expand Down
134 changes: 51 additions & 83 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -622,41 +623,20 @@ func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table
return false
}

func (e *InsertValues) hasAutoIncrementColumn() (int, bool) {
colIdx := -1
for i, c := range e.Table.Cols() {
func findAutoIncrementColumn(t table.Table) (col *table.Column, offsetInRow int, found bool) {
for i, c := range t.Cols() {
if mysql.HasAutoIncrementFlag(c.Flag) {
colIdx = i
break
return c, i, true
}
}
return colIdx, colIdx != -1
return nil, -1, false
}

func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) {
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return nil, err
}
autoDatum.SetAutoID(id, col.Flag)

if autoDatum, err = col.HandleBadNull(autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
}
return rows, nil
func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col *table.Column) error {
d.SetAutoID(id, col.Flag)
var err error
*d, err = table.CastValue(ctx, *d, col.ToInfo(), false, false)
return err
}

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
Expand All @@ -666,22 +646,14 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
if !e.lazyFillAutoID {
return rows, nil
}
// No autoIncrement column means no need to fill.
colIdx, ok := e.hasAutoIncrementColumn()
if !ok {
col, idx, found := findAutoIncrementColumn(e.Table)
if !found {
return rows, nil
}
// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx)
}
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]
rowCount := len(rows)
for processedIdx := 0; processedIdx < rowCount; processedIdx++ {
autoDatum := rows[processedIdx][idx]

var err error
var recordID int64
Expand All @@ -699,18 +671,32 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
retryInfo.AddAutoIncrementID(recordID)
rows[i][colIdx] = autoDatum
continue
}

// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
// Consume the auto IDs in RetryInfo first.
for retryInfo.Retrying && processedIdx < rowCount {
nextID, ok := retryInfo.GetCurrAutoIncrementID()
if !ok {
break
}
err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], nextID, col)
if err != nil {
return nil, err
}
processedIdx++
if processedIdx == rowCount {
return rows, nil
}
}
// Find consecutive num.
start := i
start := processedIdx
cnt := 1
for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) {
i++
for processedIdx+1 < rowCount && e.isAutoNull(ctx, rows[processedIdx+1][idx], col) {
processedIdx++
cnt++
}
// AllocBatchAutoIncrementValue allocates batch N consecutive autoIDs.
Expand All @@ -727,44 +713,33 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j)*uint64(increment))
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

// The value of d is adjusted by auto ID, so we need to cast it again.
d, err := table.CastValue(e.ctx, d, col.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &rows[offset][idx], id, col)
if err != nil {
return nil, err
}
rows[offset][colIdx] = d
retryInfo.AddAutoIncrementID(id)
}
continue
}

autoDatum.SetAutoID(recordID, col.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], recordID, col)
if err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
retryInfo.AddAutoIncrementID(recordID)
}
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return types.Datum{}, err
id, ok := retryInfo.GetCurrAutoIncrementID()
if ok {
d.SetAutoID(id, c.Flag)
return d, nil
}
d.SetAutoID(id, c.Flag)
return d, nil
}

var err error
Expand Down Expand Up @@ -803,20 +778,16 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat
}
}

d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c)
if err != nil {
return types.Datum{}, err
}
return casted, nil
retryInfo.AddAutoIncrementID(recordID)
return d, nil
}

func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int64, error) {
var recordID int64

switch target.Tp {
case mysql.TypeFloat, mysql.TypeDouble:
f := d.GetFloat64()
Expand All @@ -837,12 +808,11 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
autoRandomID, ok := retryInfo.GetCurrAutoRandomID()
if ok {
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

var err error
Expand Down Expand Up @@ -889,14 +859,12 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
}
}

d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)

casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c)
if err != nil {
return types.Datum{}, err
}
return casted, nil
retryInfo.AddAutoRandomID(recordID)
return d, nil
}

// allocAutoRandomID allocates a random id for primary key column. It assumes tableInfo.AutoRandomBits > 0.
Expand Down
46 changes: 46 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,52 @@ func (s *seqTestSuite) TestPessimisticConflictRetryAutoID(c *C) {
}
}

func (s *seqTestSuite) TestInsertFromSelectConflictRetryAutoID(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int not null auto_increment unique key, idx int unique key, c int);")
tk.MustExec("create table src (a int);")
concurrency := 2
var wg sync.WaitGroup
var err []error
wgCount := concurrency + 1
wg.Add(wgCount)
err = make([]error, concurrency)
for i := 0; i < concurrency; i++ {
tk := testkit.NewTestKitWithInit(c, s.store)
go func(idx int) {
for i := 0; i < 10; i++ {
sql := fmt.Sprintf("insert into t(idx, c) select 1 as idx, 1 as c from src on duplicate key update c = %[1]d", i)
_, e := tk.Exec(sql)
if e != nil {
err[idx] = e
wg.Done()
return
}
}
wg.Done()
}(i)
}
var insertErr error
go func() {
tk := testkit.NewTestKitWithInit(c, s.store)
for i := 0; i < 10; i++ {
_, e := tk.Exec("insert into src values (null);")
if e != nil {
insertErr = e
wg.Done()
return
}
}
wg.Done()
}()
wg.Wait()
for _, e := range err {
c.Assert(e, IsNil)
}
c.Assert(insertErr, IsNil)
}

func (s *seqTestSuite) TestAutoRandIDRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand Down
2 changes: 0 additions & 2 deletions sessionctx/variable/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (

// Error instances.
var (
errCantGetValidID = dbterror.ClassVariable.NewStd(mysql.ErrCantGetValidID)
errWarnDeprecatedSyntax = dbterror.ClassVariable.NewStd(mysql.ErrWarnDeprecatedSyntax)
ErrCantSetToNull = dbterror.ClassVariable.NewStd(mysql.ErrCantSetToNull)
ErrSnapshotTooOld = dbterror.ClassVariable.NewStd(mysql.ErrSnapshotTooOld)
ErrUnsupportedValueForVar = dbterror.ClassVariable.NewStd(mysql.ErrUnsupportedValueForVar)
ErrUnknownSystemVar = dbterror.ClassVariable.NewStd(mysql.ErrUnknownSystemVariable)
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *RetryInfo) AddAutoIncrementID(id int64) {
}

// GetCurrAutoIncrementID gets current autoIncrementID.
func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) {
func (r *RetryInfo) GetCurrAutoIncrementID() (int64, bool) {
return r.autoIncrementIDs.getCurrent()
}

Expand All @@ -92,7 +92,7 @@ func (r *RetryInfo) AddAutoRandomID(id int64) {
}

// GetCurrAutoRandomID gets current AutoRandomID.
func (r *RetryInfo) GetCurrAutoRandomID() (int64, error) {
func (r *RetryInfo) GetCurrAutoRandomID() (int64, bool) {
return r.autoRandomIDs.getCurrent()
}

Expand All @@ -112,13 +112,13 @@ func (r *retryInfoAutoIDs) clean() {
}
}

func (r *retryInfoAutoIDs) getCurrent() (int64, error) {
func (r *retryInfoAutoIDs) getCurrent() (int64, bool) {
if r.currentOffset >= len(r.autoIDs) {
return 0, errCantGetValidID
return 0, false
}
id := r.autoIDs[r.currentOffset]
r.currentOffset++
return id, nil
return id, true
}

// stmtFuture is used to async get timestamp for statement.
Expand Down