Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* : Multiple rows insert in a statement should have consecutive autoID if needed. (#11876) #12602

Merged
merged 5 commits into from
Oct 13, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 193 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type InsertValues struct {
colDefaultVals []defaultVal
evalBuffer chunk.MutRow
evalBufferTypes []*types.FieldType

// Fill the autoID lazily to datum. This is used for being compatible with JDBC using getGeneratedKeys().
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
}

type defaultVal struct {
Expand Down Expand Up @@ -184,6 +190,8 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con
batchInsert := sessVars.BatchInsert && !sessVars.InTxn()
batchSize := sessVars.DMLBatchSize

e.lazyFillAutoID = true

rows := make([][]types.Datum, 0, len(e.Lists))
for i, list := range e.Lists {
e.rowCount++
Expand All @@ -193,6 +201,11 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con
}
rows = append(rows, row)
if batchInsert && e.rowCount%uint64(batchSize) == 0 {
// Before batch insert, fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
if err != nil {
return err
}
if err = exec(ctx, rows); err != nil {
return err
}
Expand All @@ -202,6 +215,11 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con
}
}
}
// Fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
if err != nil {
return err
}
return exec(ctx, rows)
}

Expand Down Expand Up @@ -259,7 +277,7 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression
row[offset], hasValue[offset] = *val1.Copy(), true
e.evalBuffer.SetDatum(offset, val1)
}

// Row may lack of generated column, autoIncrement column, empty column here.
return e.fillRow(ctx, row, hasValue)
}

Expand Down Expand Up @@ -413,6 +431,14 @@ func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (d types.D
func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum,
error) {
if mysql.HasAutoIncrementFlag(column.Flag) {
if e.lazyFillAutoID {
// Handle hasValue info in autoIncrement column previously for lazy handle.
if !hasValue {
datum.SetNull()
}
// Store the plain datum of autoIncrement column directly for lazy handle.
return datum, nil
}
d, err := e.adjustAutoIncrementDatum(ctx, datum, hasValue, column)
if err != nil {
return types.Datum{}, err
Expand All @@ -431,6 +457,10 @@ func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx

// fillRow fills generated columns, auto_increment column and empty column.
// For NOT NULL column, it will return error or use zero value based on sql_mode.
// When lazyFillAutoID is true, fill row will lazily handle auto increment datum for lazy batch allocation.
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool) ([]types.Datum, error) {
gIdx := 0
for i, c := range e.Table.Cols() {
Expand All @@ -454,13 +484,172 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
return nil, err
}
}
// Handle the bad null error. Cause generated column with `not null` flag will get default value datum in fillColValue
// which should be override by generated expr first, then handle the bad null logic here.
if !e.lazyFillAutoID || (e.lazyFillAutoID && !mysql.HasAutoIncrementFlag(c.Flag)) {
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
}
}
return row, nil
}

// Handle the bad null error.
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
// isAutoNull can help judge whether a datum is AutoIncrement Null quickly.
// This used to help lazyFillAutoIncrement to find consecutive N datum backwards for batch autoID alloc.
func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table.Column) bool {
var err error
var recordID int64
if !d.IsNull() {
recordID, err = getAutoRecordID(d, &col.FieldType, true)
if err != nil {
return false
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
return false
}
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
return true
}
return false
}

func (e *InsertValues) hasAutoIncrementColumn() (int, bool) {
colIdx := -1
for i, c := range e.Table.Cols() {
if mysql.HasAutoIncrementFlag(c.Flag) {
colIdx = i
break
}
}
return colIdx, colIdx != -1
}

func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) {
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return nil, err
}
autoDatum.SetAutoID(id, col.Flag)

if autoDatum, err = col.HandleBadNull(autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
}
return rows, nil
}

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
}
// No autoIncrement column means no need to fill.
colIdx, ok := e.hasAutoIncrementColumn()
if !ok {
return rows, nil
}
// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx)
}
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

var err error
var recordID int64
if !autoDatum.IsNull() {
recordID, err = getAutoRecordID(autoDatum, &col.FieldType, true)
if err != nil {
return nil, err
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.RebaseAutoID(e.ctx, recordID, true)
if err != nil {
return nil, err
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
retryInfo.AddAutoIncrementID(recordID)
rows[i][colIdx] = autoDatum
continue
}

// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
// Find consecutive num.
start := i
cnt := 1
for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) {
i++
cnt++
}
// Alloc batch N consecutive (min, max] autoIDs.
// max value can be derived from adding one for cnt times.
min, _, err := table.AllocBatchAutoIncrementValue(ctx, e.Table, e.ctx, cnt)
if e.filterErr(err) != nil {
return nil, err
}
// It's compatible with mysql setting the first allocated autoID to lastInsertID.
// Cause autoID may be specified by user, judge only the first row is not suitable.
if e.lastInsertID == 0 {
e.lastInsertID = uint64(min) + 1
}
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j) + 1)
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

// The value of d is adjusted by auto ID, so we need to cast it again.
d, err := table.CastValue(e.ctx, d, col.ToInfo())
if err != nil {
return nil, err
}
rows[offset][colIdx] = d
}
continue
}

autoDatum.SetAutoID(recordID, col.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo())
if err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
return row, nil
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
Expand Down
Loading