Skip to content

Commit

Permalink
executor: Fix a bug in 'INSERT ... ON DUPLICATE KEY UPDATE' (#7675)
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored and jackysp committed Sep 14, 2018
1 parent 08e56cd commit 33a30cc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 16 deletions.
22 changes: 15 additions & 7 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type toBeCheckedRow struct {
rowValue []byte
handleKey *keyValueWithDupInfo
uniqueKeys []*keyValueWithDupInfo
// The table or partition this row belongs to.
// t is the table or partition this row belongs to.
t table.Table
}

type batchChecker struct {
// For duplicate key update
// toBeCheckedRows is used for duplicate key update
toBeCheckedRows []toBeCheckedRow
dupKVs map[string][]byte
dupOldRowValues map[string][]byte
Expand Down Expand Up @@ -257,13 +257,21 @@ func (b *batchChecker) fillBackKeys(t table.Table, row toBeCheckedRow, handle in
}
}

func (b *batchChecker) deleteDupKeys(row toBeCheckedRow) {
if row.handleKey != nil {
delete(b.dupKVs, string(row.handleKey.newKV.key))
// deleteDupKeys picks primary/unique key-value pairs from rows and remove them from the dupKVs
func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, t, rows)
if err != nil {
return errors.Trace(err)
}
for _, uk := range row.uniqueKeys {
delete(b.dupKVs, string(uk.newKV.key))
for _, row := range cleanupRows {
if row.handleKey != nil {
delete(b.dupKVs, string(row.handleKey.newKV.key))
}
for _, uk := range row.uniqueKeys {
delete(b.dupKVs, string(uk.newKV.key))
}
}
return nil
}

// getOldRow gets the table record row from storage for batch check.
Expand Down
12 changes: 8 additions & 4 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate
if err != nil {
return errors.Trace(err)
}
return e.updateDupKeyValues(row, handle, newHandle, handleChanged, updatedRow)
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowUpdate updates the duplicate row.
Expand Down Expand Up @@ -215,15 +215,19 @@ func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow [
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *InsertExec) updateDupKeyValues(row toBeCheckedRow, oldHandle int64,
newHandle int64, handleChanged bool, updatedRow []types.Datum) error {
func (e *InsertExec) updateDupKeyValues(oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return errors.Trace(err)
}
// Delete old keys and fill back new key-values of the updated row.
e.deleteDupKeys(row)
err = e.deleteDupKeys(e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return errors.Trace(err)
}

if handleChanged {
delete(e.dupOldRowValues, string(e.Table.RecordKey(oldHandle)))
e.fillBackKeys(e.Table, fillBackKeysInRows[0], newHandle)
Expand Down
28 changes: 28 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,34 @@ func (s *testSuite) TestInsertOnDuplicateKey(c *C) {
tk.MustQuery(`select * from t;`).Check(testkit.Rows(`1 2 3`))
tk.MustExec(`insert into t (v, k1, k2) select c, a, b from (select "3" c, "1" a, "2" b) tmp on duplicate key update v=c;`)
tk.MustQuery(`select * from t;`).Check(testkit.Rows(`1 2 3`))

tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(id int, a int, b int);`)
tk.MustExec(`insert into t1 values (1, 1, 1);`)
tk.MustExec(`insert into t1 values (2, 2, 1);`)
tk.MustExec(`insert into t1 values (3, 3, 1);`)
tk.MustExec(`create table t2(a int primary key, b int, unique(b));`)
tk.MustExec(`insert into t2 select a, b from t1 order by id on duplicate key update a=t1.a, b=t1.b;`)
tk.MustQuery(`select * from t2 order by a;`).Check(testkit.Rows(`3 1`))

tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(id int, a int, b int);`)
tk.MustExec(`insert into t1 values (1, 1, 1);`)
tk.MustExec(`insert into t1 values (2, 1, 2);`)
tk.MustExec(`insert into t1 values (3, 3, 1);`)
tk.MustExec(`create table t2(a int primary key, b int, unique(b));`)
tk.MustExec(`insert into t2 select a, b from t1 order by id on duplicate key update a=t1.a, b=t1.b;`)
tk.MustQuery(`select * from t2 order by a;`).Check(testkit.Rows(`1 2`, `3 1`))

tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(id int, a int, b int, c int);`)
tk.MustExec(`insert into t1 values (1, 1, 1, 1);`)
tk.MustExec(`insert into t1 values (2, 2, 1, 2);`)
tk.MustExec(`insert into t1 values (3, 3, 2, 2);`)
tk.MustExec(`insert into t1 values (4, 4, 2, 2);`)
tk.MustExec(`create table t2(a int primary key, b int, c int, unique(b), unique(c));`)
tk.MustExec(`insert into t2 select a, b, c from t1 order by id on duplicate key update b=t2.b, c=t2.c;`)
tk.MustQuery(`select * from t2 order by a;`).Check(testkit.Rows(`1 1 1`, `3 2 2`))
}

func (s *testSuite) TestUpdateDuplicateKey(c *C) {
Expand Down
6 changes: 1 addition & 5 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,10 @@ func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
cleanupRows, err := e.getKeysNeedCheck(e.ctx, r.t, [][]types.Datum{oldRow})
err = e.deleteDupKeys(e.ctx, r.t, [][]types.Datum{oldRow})
if err != nil {
return false, errors.Trace(err)
}
if len(cleanupRows) > 0 {
// The length of need-to-cleanup rows should be at most 1, due to we only input 1 row.
e.deleteDupKeys(cleanupRows[0])
}
return false, nil
}

Expand Down

0 comments on commit 33a30cc

Please sign in to comment.