Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, planner: ON DUPLICATE UPDATE can refer to un-project col (#14412) #22233

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
allAssignmentsAreConstant: v.AllAssignmentsAreConstant,
hasRefCols: v.NeedFillDefaultValue,
SelectExec: selectExec,
rowLen: v.RowLen,
}
err := ivs.initInsertColumns()
if err != nil {
Expand Down
24 changes: 21 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,20 @@ func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheck
}

// updateDupRow updates a duplicate row to a new row.
<<<<<<< HEAD
func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
=======
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
>>>>>>> c7165bc51... executor, planner: ON DUPLICATE UPDATE can refer to un-project col (#14412)
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}
// get the extra columns from the SELECT clause and get the final `oldRow`.
if len(e.ctx.GetSessionVars().CurrInsertBatchExtraCols) > 0 {
extraCols := e.ctx.GetSessionVars().CurrInsertBatchExtraCols[idxInBatch]
oldRow = append(oldRow, extraCols...)
}

_, _, _, err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, e.OnDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
Expand Down Expand Up @@ -217,7 +226,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

err = e.updateDupRow(ctx, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
Expand All @@ -239,7 +248,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

err = e.updateDupRow(ctx, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down Expand Up @@ -285,6 +294,7 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0]
e.setMessage()
if e.SelectExec != nil {
return e.SelectExec.Close()
Expand Down Expand Up @@ -315,12 +325,20 @@ func (e *InsertExec) initEvalBuffer4Dup() {
// Use writable columns for old row for update.
numWritableCols := len(e.Table.WritableCols())

evalBufferTypes := make([]*types.FieldType, 0, numCols+numWritableCols)
extraLen := 0
if e.SelectExec != nil {
extraLen = e.SelectExec.Schema().Len() - e.rowLen
}

evalBufferTypes := make([]*types.FieldType, 0, numCols+numWritableCols+extraLen)

// Append the old row before the new row, to be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
for _, col := range e.Table.WritableCols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
if extraLen > 0 {
evalBufferTypes = append(evalBufferTypes, e.SelectExec.base().retFieldTypes[numWritableCols:]...)
}
for _, col := range e.Table.Cols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
Expand Down
23 changes: 18 additions & 5 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type InsertValues struct {
lazyFillAutoID bool
memTracker *memory.Tracker

rowLen int

stats *InsertRuntimeStat
}

Expand Down Expand Up @@ -418,7 +420,9 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML
batchSize := sessVars.DMLBatchSize
memUsageOfRows := int64(0)
memUsageOfExtraCols := int64(0)
memTracker := e.memTracker
extraColsInSel := make([][]types.Datum, 0, chk.Capacity())
for {
err := Next(ctx, selectExec, chk)
if err != nil {
Expand All @@ -436,15 +440,20 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
if err != nil {
return err
}
extraColsInSel = append(extraColsInSel, innerRow[e.rowLen:])
rows = append(rows, row)
if batchInsert && e.rowCount%uint64(batchSize) == 0 {
memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows))
memTracker.Consume(memUsageOfRows)
memUsageOfExtraCols = types.EstimatedMemUsage(extraColsInSel[0], len(extraColsInSel))
memTracker.Consume(memUsageOfRows + memUsageOfExtraCols)
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = extraColsInSel
if err = base.exec(ctx, rows); err != nil {
return err
}
rows = rows[:0]
extraColsInSel = extraColsInSel[:0]
memTracker.Consume(-memUsageOfRows)
memTracker.Consume(-memUsageOfExtraCols)
memUsageOfRows = 0
if err = e.doBatchInsert(ctx); err != nil {
return err
Expand All @@ -454,14 +463,18 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {

if len(rows) != 0 {
memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows))
memTracker.Consume(memUsageOfRows)
memUsageOfExtraCols = types.EstimatedMemUsage(extraColsInSel[0], len(extraColsInSel))
memTracker.Consume(memUsageOfRows + memUsageOfExtraCols)
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = extraColsInSel
}
err = base.exec(ctx, rows)
if err != nil {
return err
}
rows = rows[:0]
extraColsInSel = extraColsInSel[:0]
memTracker.Consume(-memUsageOfRows)
memTracker.Consume(-memUsageOfExtraCols)
memTracker.Consume(-chkMemUsage)
}
return nil
Expand All @@ -484,9 +497,9 @@ func (e *InsertValues) doBatchInsert(ctx context.Context) error {
func (e *InsertValues) getRow(ctx context.Context, vals []types.Datum) ([]types.Datum, error) {
row := make([]types.Datum, len(e.Table.Cols()))
hasValue := make([]bool, len(e.Table.Cols()))
for i, v := range vals {
casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo(), false, false)
if e.handleErr(nil, &v, 0, err) != nil {
for i := 0; i < e.rowLen; i++ {
casted, err := table.CastValue(e.ctx, vals[i], e.insertColumns[i].ToInfo(), false, false)
if e.handleErr(nil, &vals[i], 0, err) != nil {
return nil, err
}

Expand Down
9 changes: 9 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ func (s *testSuite8) TestInsertOnDuplicateKey(c *C) {
c.Assert(tk.Se.AffectedRows(), Equals, uint64(7))
tk.CheckLastMessage("Records: 5 Duplicates: 2 Warnings: 0")

tk.MustExec("drop table if exists a, b")
tk.MustExec("create table a(x int primary key)")
tk.MustExec("create table b(x int, y int)")
tk.MustExec("insert into a values(1)")
tk.MustExec("insert into b values(1, 2)")
tk.MustExec("insert into a select x from b ON DUPLICATE KEY UPDATE a.x=b.y")
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery("select * from a").Check(testkit.Rows("2"))

// reproduce insert on duplicate key update bug under new row format.
tk.MustExec(`drop table if exists t1`)
tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`)
Expand Down
90 changes: 90 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,96 @@ type LoadDataInfo struct {
QuitCh chan struct{}
}

<<<<<<< HEAD
=======
// FieldMapping inticates the relationship between input field and table column or user variable
type FieldMapping struct {
Column *table.Column
UserVar *ast.VariableExpr
}

// initLoadColumns sets columns which the input fields loaded to.
func (e *LoadDataInfo) initLoadColumns(columnNames []string) error {
var cols []*table.Column
var missingColName string
var err error
tableCols := e.Table.Cols()

if len(columnNames) != len(tableCols) {
for _, v := range e.ColumnAssignments {
columnNames = append(columnNames, v.Column.Name.O)
}

cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle)
if missingColName != "" {
return errors.Errorf("LOAD DATA INTO %s: unknown column %s", e.Table.Meta().Name.O, missingColName)
}
} else {
cols = tableCols
}

for _, col := range cols {
if !col.IsGenerated() {
e.insertColumns = append(e.insertColumns, col)
}
if col.Name.L == model.ExtraHandleName.L {
if !e.ctx.GetSessionVars().AllowWriteRowID {
return errors.Errorf("load data statement for _tidb_rowid are not supported.")
}
e.hasExtraHandle = true
break
}
}
e.rowLen = len(e.insertColumns)
// Check column whether is specified only once.
err = table.CheckOnce(cols)
if err != nil {
return err
}

return nil
}

// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable
// the slice's order is the same as the order of the input fields.
// Returns a slice of same ordered column names without user defined variable names.
func (e *LoadDataInfo) initFieldMappings() []string {
columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments))
tableCols := e.Table.Cols()

if len(e.ColumnsAndUserVars) == 0 {
for _, v := range tableCols {
fieldMapping := &FieldMapping{
Column: v,
}
e.FieldMappings = append(e.FieldMappings, fieldMapping)
columns = append(columns, v.Name.O)
}

return columns
}

var column *table.Column

for _, v := range e.ColumnsAndUserVars {
if v.ColumnName != nil {
column = table.FindCol(tableCols, v.ColumnName.Name.O)
columns = append(columns, v.ColumnName.Name.O)
} else {
column = nil
}

fieldMapping := &FieldMapping{
Column: column,
UserVar: v.UserVar,
}
e.FieldMappings = append(e.FieldMappings, fieldMapping)
}

return columns
}

>>>>>>> c7165bc51... executor, planner: ON DUPLICATE UPDATE can refer to un-project col (#14412)
// GetRows getter for rows
func (e *LoadDataInfo) GetRows() [][]types.Datum {
return e.rows
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ type Insert struct {
NeedFillDefaultValue bool

AllAssignmentsAreConstant bool

RowLen int
}

// Update represents Update plan.
Expand Down
Loading