Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Add insert/replace ignore/on duplicate key support for local temporary table #26636

Merged
merged 17 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func formatDataForDupError(data []types.Datum) (string, error) {

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle kv.Handle,
func getOldRow(ctx context.Context, sctx sessionctx.Context, kvGetter kv.Getter, t table.Table, handle kv.Handle,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle))
oldValue, err := kvGetter.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle))
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8463,9 +8463,19 @@ func (s testSerialSuite) assertTemporaryTableNoNetwork(c *C, temporaryTableType
tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ b from tmp_t where a = 1").Check(testkit.Rows("1"))
tk.MustExec("rollback")

// prepare some data for local temporary table, when for global temporary table, the below operations have no effect.
tk.MustExec("insert into tmp_t value(10, 10, 10)")
tk.MustExec("insert into tmp_t value(11, 11, 11)")

// Pessimistic lock
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tmp_t values (3, 3, 3)")
tk.MustExec("insert ignore into tmp_t values (4, 4, 4)")
tk.MustExec("insert into tmp_t values (5, 5, 5) on duplicate key update a=100")
tk.MustExec("insert into tmp_t values (10, 10, 10) on duplicate key update a=100")
tk.MustExec("insert ignore into tmp_t values (10, 10, 10) on duplicate key update id=11")
tk.MustExec("replace into tmp_t values(6, 6, 6)")
tk.MustExec("replace into tmp_t values(11, 100, 100)")
tk.MustExec("update tmp_t set id = id + 1 where a = 1")
tk.MustExec("delete from tmp_t where a > 1")
tk.MustQuery("select count(*) from tmp_t where a >= 1 for update")
Expand Down
23 changes: 16 additions & 7 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"runtime/trace"
"time"

"github.com/pingcap/parser/model"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -166,7 +168,12 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
// Temporary table need not to do prefetch because its all data are stored in the memory.
if e.Table.Meta().TempTableType != model.TempTableNone {
return nil
}

tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -180,8 +187,8 @@ func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheck
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, kvGetter kv.Getter, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, kvGetter, row.t, handle, e.GenExprs)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,20 +229,22 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
if err = e.prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}
if e.stats != nil {
e.stats.Prefetch += time.Since(prefetchStart)
}

txnValueGetter := e.txnValueGetter(txn)
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate)
if err == nil {
continue
}
Expand All @@ -245,7 +254,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKey)
val, err := txnValueGetter.Get(ctx, uk.newKey)
if err != nil {
if kv.IsErrNotFound(err) {
continue
Expand All @@ -257,7 +266,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down
22 changes: 18 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,21 +1051,26 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
prefetchStart := time.Now()
// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
// Temporary table need not to do prefetch because its all data are stored in the memory.
if e.Table.Meta().TempTableType == model.TempTableNone {
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is what is the return of perfetchUniqueIndices. If it returns an empty data, it seems not fit the semantics of the method. If it returns the data in temporary table, it seems unnecessary and no one depends the return to do anything...

return err
}
}

if e.stats != nil {
e.stats.Prefetch += time.Since(prefetchStart)
}

txnValueGetter := e.txnValueGetter(txn)
// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
if r.ignored {
continue
}
skip := false
if r.handleKey != nil {
_, err := txn.Get(ctx, r.handleKey.newKey)
_, err := txnValueGetter.Get(ctx, r.handleKey.newKey)
if err == nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
Expand All @@ -1075,7 +1080,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
_, err := txnValueGetter.Get(ctx, uk.newKey)
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
Expand Down Expand Up @@ -1104,6 +1109,15 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

func (e *InsertValues) txnValueGetter(txn kv.Transaction) kv.Getter {
tblInfo := e.Table.Meta()
if tblInfo.TempTableType == model.TempTableNone {
return txn
}

return e.ctx.GetSessionVars().TemporaryTableTxnReader(txn, tblInfo)
}

func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)

// Local temporary table always get snapshot value from session
if e.tblInfo.TempTableType == model.TempTableLocal {
return e.ctx.GetSessionVars().GetTemporaryTableSnapshotValue(ctx, key)
return e.ctx.GetSessionVars().TemporaryTableSnapshotReader(e.tblInfo).Get(ctx, key)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}

lock := e.tblInfo.Lock
Expand Down
20 changes: 11 additions & 9 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (e *ReplaceExec) Open(ctx context.Context) error {

// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle kv.Handle, r toBeCheckedRow) (bool, error) {
func (e *ReplaceExec) removeRow(ctx context.Context, kvGetter kv.Getter, handle kv.Handle, r toBeCheckedRow) (bool, error) {
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
oldRow, err := getOldRow(ctx, e.ctx, kvGetter, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
zap.String("handle", handle.String()),
Expand Down Expand Up @@ -119,14 +119,15 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
return err
}

txnValueGetter := e.txnValueGetter(txn)
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

if _, err := txn.Get(ctx, r.handleKey.newKey); err == nil {
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if _, err := txnValueGetter.Get(ctx, r.handleKey.newKey); err == nil {
rowUnchanged, err := e.removeRow(ctx, txnValueGetter, handle, r)
if err != nil {
return err
}
Expand All @@ -142,7 +143,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {

// Keep on removing duplicated rows.
for {
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txn, r)
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txnValueGetter, r)
if err != nil {
return err
}
Expand All @@ -169,9 +170,9 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) {
func (e *ReplaceExec) removeIndexRow(ctx context.Context, kvGetter kv.Getter, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKey)
val, err := kvGetter.Get(ctx, uk.newKey)
if err != nil {
if kv.IsErrNotFound(err) {
continue
Expand All @@ -182,7 +183,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r
if err != nil {
return false, true, err
}
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
rowUnchanged, err := e.removeRow(ctx, kvGetter, handle, r)
if err != nil {
return false, true, err
}
Expand Down Expand Up @@ -228,9 +229,10 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
if err = e.prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.Prefetch = time.Since(prefetchStart)
}
Expand Down
117 changes: 117 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4994,6 +4994,123 @@ func (s *testSessionSuite) TestLocalTemporaryTableInsert(c *C) {
tk.MustQuery("select * from tmp1 where id=5").Check(testkit.Rows())
}

func (s *testSessionSuite) TestLocalTemporaryTableInsertIgnore(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
tk.MustExec("use test")
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values(1, 11, 101)")
tk.MustExec("insert into tmp1 values(2, 12, 102)")

// test outside transaction
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert ignore into tmp1 values(5, 15, 105)")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=5").Check(testkit.Rows("5 15 105"))

// test in transaction and rollback
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert ignore into tmp1 values(3, 13, 103)")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=3").Check(testkit.Rows("3 13 103"))
tk.MustExec("insert ignore into tmp1 values(3, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '3' for key 'PRIMARY'"))
tk.MustQuery("select * from tmp1 where id=3").Check(testkit.Rows("3 13 103"))
tk.MustExec("rollback")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 102", "5 15 105"))

// test commit
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'"))
tk.MustExec("insert ignore into tmp1 values(3, 13, 103)")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustExec("insert ignore into tmp1 values(3, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '3' for key 'PRIMARY'"))
tk.MustExec("commit")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 102", "3 13 103", "5 15 105"))
}

func (s *testSessionSuite) TestLocalTemporaryTableInsertOnDuplicateKeyUpdate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
tk.MustExec("use test")
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values(1, 11, 101)")
tk.MustExec("insert into tmp1 values(2, 12, 102)")

// test outside transaction
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000) on duplicate key update u=12")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '12' for key 'u'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert into tmp1 values(2, 100, 1000) on duplicate key update v=202")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=2").Check(testkit.Rows("2 12 202"))
tk.MustExec("insert into tmp1 values(3, 13, 103) on duplicate key update v=203")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=3").Check(testkit.Rows("3 13 103"))

// test in transaction and rollback
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000) on duplicate key update u=12")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '12' for key 'u'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert into tmp1 values(2, 100, 1000) on duplicate key update v=302")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=2").Check(testkit.Rows("2 12 302"))
tk.MustExec("insert into tmp1 values(4, 14, 104) on duplicate key update v=204")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=4").Check(testkit.Rows("4 14 104"))
tk.MustExec("rollback")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 202", "3 13 103"))

// test commit
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000) on duplicate key update u=12")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '12' for key 'u'"))
tk.MustExec("insert into tmp1 values(2, 100, 1000) on duplicate key update v=302")
tk.MustExec("insert into tmp1 values(4, 14, 104) on duplicate key update v=204")
tk.MustExec("commit")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 302", "3 13 103", "4 14 104"))
}

func (s *testSessionSuite) TestLocalTemporaryTableReplace(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
tk.MustExec("use test")
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values(1, 11, 101)")
tk.MustExec("insert into tmp1 values(2, 12, 102)")
tk.MustExec("insert into tmp1 values(3, 13, 103)")

// out of transaction
tk.MustExec("replace into tmp1 values(1, 12, 1000)")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 12 1000", "3 13 103"))
tk.MustExec("replace into tmp1 values(4, 14, 104)")
tk.MustQuery("select * from tmp1 where id=4").Check(testkit.Rows("4 14 104"))

// in transaction and rollback
tk.MustExec("begin")
tk.MustExec("replace into tmp1 values(1, 13, 999)")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 13 999", "4 14 104"))
tk.MustExec("replace into tmp1 values(5, 15, 105)")
tk.MustQuery("select * from tmp1 where id=5").Check(testkit.Rows("5 15 105"))
tk.MustExec("rollback")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 12 1000", "3 13 103", "4 14 104"))

// out of transaction
tk.MustExec("begin")
tk.MustExec("replace into tmp1 values(1, 13, 999)")
tk.MustExec("replace into tmp1 values(5, 15, 105)")
tk.MustExec("commit")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 13 999", "4 14 104", "5 15 105"))
}

func (s *testSessionSuite) TestLocalTemporaryTableDelete(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
Expand Down
Loading