Skip to content

Commit

Permalink
*: use TemporaryTableSnapshotInterceptor to update tables
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Sep 20, 2021
1 parent cddb584 commit cc53474
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 97 deletions.
4 changes: 2 additions & 2 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ func formatDataForDupError(data []types.Datum) (string, error) {

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func getOldRow(ctx context.Context, sctx sessionctx.Context, kvGetter kv.Getter, t table.Table, handle kv.Handle,
func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle kv.Handle,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := kvGetter.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle))
oldValue, err := txn.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle))
if err != nil {
return nil, err
}
Expand Down
14 changes: 6 additions & 8 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
"runtime/trace"
"time"

"github.com/pingcap/parser/model"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -188,8 +187,8 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, kvGetter kv.Getter, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, kvGetter, row.t, handle, e.GenExprs)
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}
Expand Down Expand Up @@ -237,15 +236,14 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
e.stats.Prefetch += time.Since(prefetchStart)
}

txnValueGetter := e.txnValueGetter(txn)
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
Expand All @@ -255,7 +253,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}

for _, uk := range r.uniqueKeys {
val, err := txnValueGetter.Get(ctx, uk.newKey)
val, err := txn.Get(ctx, uk.newKey)
if err != nil {
if kv.IsErrNotFound(err) {
continue
Expand All @@ -267,7 +265,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down
14 changes: 2 additions & 12 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,15 +1065,14 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
e.stats.Prefetch += time.Since(prefetchStart)
}

txnValueGetter := e.txnValueGetter(txn)
// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
if r.ignored {
continue
}
skip := false
if r.handleKey != nil {
_, err := txnValueGetter.Get(ctx, r.handleKey.newKey)
_, err := txn.Get(ctx, r.handleKey.newKey)
if err == nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
Expand All @@ -1083,7 +1082,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
for _, uk := range r.uniqueKeys {
_, err := txnValueGetter.Get(ctx, uk.newKey)
_, err := txn.Get(ctx, uk.newKey)
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
Expand Down Expand Up @@ -1112,15 +1111,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

func (e *InsertValues) txnValueGetter(txn kv.Transaction) kv.Getter {
tblInfo := e.Table.Meta()
if tblInfo.TempTableType == model.TempTableNone {
return txn
}

return e.ctx.GetSessionVars().TemporaryTableTxnReader(txn, tblInfo)
}

func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}
Expand Down
17 changes: 8 additions & 9 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func (e *ReplaceExec) Open(ctx context.Context) error {

// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(ctx context.Context, kvGetter kv.Getter, handle kv.Handle, r toBeCheckedRow) (bool, error) {
func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle kv.Handle, r toBeCheckedRow) (bool, error) {
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, kvGetter, r.t, handle, e.GenExprs)
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
zap.String("handle", handle.String()),
Expand Down Expand Up @@ -120,15 +120,14 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
return err
}

txnValueGetter := e.txnValueGetter(txn)
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

if _, err := txnValueGetter.Get(ctx, r.handleKey.newKey); err == nil {
rowUnchanged, err := e.removeRow(ctx, txnValueGetter, handle, r)
if _, err := txn.Get(ctx, r.handleKey.newKey); err == nil {
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if err != nil {
return err
}
Expand All @@ -144,7 +143,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {

// Keep on removing duplicated rows.
for {
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txnValueGetter, r)
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txn, r)
if err != nil {
return err
}
Expand All @@ -171,9 +170,9 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(ctx context.Context, kvGetter kv.Getter, r toBeCheckedRow) (bool, bool, error) {
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
val, err := kvGetter.Get(ctx, uk.newKey)
val, err := txn.Get(ctx, uk.newKey)
if err != nil {
if kv.IsErrNotFound(err) {
continue
Expand All @@ -184,7 +183,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, kvGetter kv.Getter, r
if err != nil {
return false, true, err
}
rowUnchanged, err := e.removeRow(ctx, kvGetter, handle, r)
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if err != nil {
return false, true, err
}
Expand Down
64 changes: 0 additions & 64 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package variable

import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -2301,66 +2300,3 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 {
}
return s.seekFactor
}

// TemporaryTableSnapshotReader can read the temporary table snapshot data
type TemporaryTableSnapshotReader struct {
temporaryTableData TemporaryTableData
}

// Get gets the value for key k from snapshot.
func (s *TemporaryTableSnapshotReader) Get(ctx context.Context, k kv.Key) ([]byte, error) {
if s.temporaryTableData == nil {
return nil, kv.ErrNotExist
}

v, err := s.temporaryTableData.Get(ctx, k)
if err != nil {
return v, err
}

if len(v) == 0 {
return nil, kv.ErrNotExist
}

return v, nil
}

// TemporaryTableSnapshotReader can read the temporary table snapshot data
func (s *SessionVars) TemporaryTableSnapshotReader(tblInfo *model.TableInfo) *TemporaryTableSnapshotReader {
if tblInfo.TempTableType == model.TempTableGlobal {
return &TemporaryTableSnapshotReader{nil}
}
return &TemporaryTableSnapshotReader{s.TemporaryTableData}
}

// TemporaryTableTxnReader can read the temporary table txn data
type TemporaryTableTxnReader struct {
memBuffer kv.MemBuffer
snapshot *TemporaryTableSnapshotReader
}

// Get gets the value for key k from txn.
func (s *TemporaryTableTxnReader) Get(ctx context.Context, k kv.Key) ([]byte, error) {
v, err := s.memBuffer.Get(ctx, k)
if err == nil {
if len(v) == 0 {
return nil, kv.ErrNotExist
}

return v, nil
}

if !kv.IsErrNotFound(err) {
return v, err
}

return s.snapshot.Get(ctx, k)
}

// TemporaryTableTxnReader can read the temporary table txn data
func (s *SessionVars) TemporaryTableTxnReader(txn kv.Transaction, tblInfo *model.TableInfo) *TemporaryTableTxnReader {
return &TemporaryTableTxnReader{
memBuffer: txn.GetMemBuffer(),
snapshot: s.TemporaryTableSnapshotReader(tblInfo),
}
}
2 changes: 1 addition & 1 deletion table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
var value []byte
if c.tblInfo.TempTableType != model.TempTableNone {
// Always check key for temporary table because it does not write to TiKV
value, err = sctx.GetSessionVars().TemporaryTableTxnReader(txn, c.tblInfo).Get(ctx, key)
value, err = txn.Get(ctx, key)
} else if sctx.GetSessionVars().LazyCheckKeyNotExists() {
value, err = txn.GetMemBuffer().Get(ctx, key)
} else {
Expand Down
2 changes: 1 addition & 1 deletion table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck {
if t.meta.TempTableType != model.TempTableNone {
// Always check key for temporary table because it does not write to TiKV
_, err = sctx.GetSessionVars().TemporaryTableTxnReader(txn, t.meta).Get(ctx, key)
_, err = txn.Get(ctx, key)
} else if sctx.GetSessionVars().LazyCheckKeyNotExists() {
var v []byte
v, err = txn.GetMemBuffer().Get(ctx, key)
Expand Down

0 comments on commit cc53474

Please sign in to comment.