diff --git a/executor/batch_checker.go b/executor/batch_checker.go index ed0af152a2560..a5ef7efde964c 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -235,9 +235,9 @@ func formatDataForDupError(data []types.Datum) (string, error) { // getOldRow gets the table record row from storage for batch check. // t could be a normal table or a partition, but it must not be a PartitionedTable. -func getOldRow(ctx context.Context, sctx sessionctx.Context, kvGetter kv.Getter, t table.Table, handle kv.Handle, +func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle kv.Handle, genExprs []expression.Expression) ([]types.Datum, error) { - oldValue, err := kvGetter.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle)) + oldValue, err := txn.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle)) if err != nil { return nil, err } diff --git a/executor/insert.go b/executor/insert.go index c598232e895f1..f5b443387dd75 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -21,9 +21,8 @@ import ( "runtime/trace" "time" - "github.com/pingcap/parser/model" - "github.com/opentracing/opentracing-go" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -188,8 +187,8 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction } // updateDupRow updates a duplicate row to a new row. -func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, kvGetter kv.Getter, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error { - oldRow, err := getOldRow(ctx, e.ctx, kvGetter, row.t, handle, e.GenExprs) +func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error { + oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs) if err != nil { return err } @@ -237,7 +236,6 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D e.stats.Prefetch += time.Since(prefetchStart) } - txnValueGetter := e.txnValueGetter(txn) for i, r := range toBeCheckedRows { if r.handleKey != nil { handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey) @@ -245,7 +243,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D return err } - err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate) + err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) if err == nil { continue } @@ -255,7 +253,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - val, err := txnValueGetter.Get(ctx, uk.newKey) + val, err := txn.Get(ctx, uk.newKey) if err != nil { if kv.IsErrNotFound(err) { continue @@ -267,7 +265,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D return err } - err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate) + err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) if err != nil { if kv.IsErrNotFound(err) { // Data index inconsistent? A unique key provide the handle information, but the diff --git a/executor/insert_common.go b/executor/insert_common.go index 0ac00a9973067..f2356ff85776f 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -1065,7 +1065,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D e.stats.Prefetch += time.Since(prefetchStart) } - txnValueGetter := e.txnValueGetter(txn) // append warnings and get no duplicated error rows for i, r := range toBeCheckedRows { if r.ignored { @@ -1073,7 +1072,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D } skip := false if r.handleKey != nil { - _, err := txnValueGetter.Get(ctx, r.handleKey.newKey) + _, err := txn.Get(ctx, r.handleKey.newKey) if err == nil { e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr) continue @@ -1083,7 +1082,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D } } for _, uk := range r.uniqueKeys { - _, err := txnValueGetter.Get(ctx, uk.newKey) + _, err := txn.Get(ctx, uk.newKey) if err == nil { // If duplicate keys were found in BatchGet, mark row = nil. e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr) @@ -1112,15 +1111,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D return nil } -func (e *InsertValues) txnValueGetter(txn kv.Transaction) kv.Getter { - tblInfo := e.Table.Meta() - if tblInfo.TempTableType == model.TempTableNone { - return txn - } - - return e.ctx.GetSessionVars().TemporaryTableTxnReader(txn, tblInfo) -} - func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error { return e.addRecordWithAutoIDHint(ctx, row, 0) } diff --git a/executor/replace.go b/executor/replace.go index 78fb519049d84..cf96ec99320bd 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -62,9 +62,9 @@ func (e *ReplaceExec) Open(ctx context.Context) error { // removeRow removes the duplicate row and cleanup its keys in the key-value map, // but if the to-be-removed row equals to the to-be-added row, no remove or add things to do. -func (e *ReplaceExec) removeRow(ctx context.Context, kvGetter kv.Getter, handle kv.Handle, r toBeCheckedRow) (bool, error) { +func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle kv.Handle, r toBeCheckedRow) (bool, error) { newRow := r.row - oldRow, err := getOldRow(ctx, e.ctx, kvGetter, r.t, handle, e.GenExprs) + oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs) if err != nil { logutil.BgLogger().Error("get old row failed when replace", zap.String("handle", handle.String()), @@ -120,15 +120,14 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { return err } - txnValueGetter := e.txnValueGetter(txn) if r.handleKey != nil { handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey) if err != nil { return err } - if _, err := txnValueGetter.Get(ctx, r.handleKey.newKey); err == nil { - rowUnchanged, err := e.removeRow(ctx, txnValueGetter, handle, r) + if _, err := txn.Get(ctx, r.handleKey.newKey); err == nil { + rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { return err } @@ -144,7 +143,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // Keep on removing duplicated rows. for { - rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txnValueGetter, r) + rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txn, r) if err != nil { return err } @@ -171,9 +170,9 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 2. bool: true when found the duplicated key. This only means that duplicated key was found, // and the row was removed. // 3. error: the error. -func (e *ReplaceExec) removeIndexRow(ctx context.Context, kvGetter kv.Getter, r toBeCheckedRow) (bool, bool, error) { +func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - val, err := kvGetter.Get(ctx, uk.newKey) + val, err := txn.Get(ctx, uk.newKey) if err != nil { if kv.IsErrNotFound(err) { continue @@ -184,7 +183,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, kvGetter kv.Getter, r if err != nil { return false, true, err } - rowUnchanged, err := e.removeRow(ctx, kvGetter, handle, r) + rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { return false, true, err } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 63be07c8a18f2..dd937e848ee61 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -16,7 +16,6 @@ package variable import ( "bytes" - "context" "crypto/tls" "encoding/binary" "fmt" @@ -2301,66 +2300,3 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 { } return s.seekFactor } - -// TemporaryTableSnapshotReader can read the temporary table snapshot data -type TemporaryTableSnapshotReader struct { - temporaryTableData TemporaryTableData -} - -// Get gets the value for key k from snapshot. -func (s *TemporaryTableSnapshotReader) Get(ctx context.Context, k kv.Key) ([]byte, error) { - if s.temporaryTableData == nil { - return nil, kv.ErrNotExist - } - - v, err := s.temporaryTableData.Get(ctx, k) - if err != nil { - return v, err - } - - if len(v) == 0 { - return nil, kv.ErrNotExist - } - - return v, nil -} - -// TemporaryTableSnapshotReader can read the temporary table snapshot data -func (s *SessionVars) TemporaryTableSnapshotReader(tblInfo *model.TableInfo) *TemporaryTableSnapshotReader { - if tblInfo.TempTableType == model.TempTableGlobal { - return &TemporaryTableSnapshotReader{nil} - } - return &TemporaryTableSnapshotReader{s.TemporaryTableData} -} - -// TemporaryTableTxnReader can read the temporary table txn data -type TemporaryTableTxnReader struct { - memBuffer kv.MemBuffer - snapshot *TemporaryTableSnapshotReader -} - -// Get gets the value for key k from txn. -func (s *TemporaryTableTxnReader) Get(ctx context.Context, k kv.Key) ([]byte, error) { - v, err := s.memBuffer.Get(ctx, k) - if err == nil { - if len(v) == 0 { - return nil, kv.ErrNotExist - } - - return v, nil - } - - if !kv.IsErrNotFound(err) { - return v, err - } - - return s.snapshot.Get(ctx, k) -} - -// TemporaryTableTxnReader can read the temporary table txn data -func (s *SessionVars) TemporaryTableTxnReader(txn kv.Transaction, tblInfo *model.TableInfo) *TemporaryTableTxnReader { - return &TemporaryTableTxnReader{ - memBuffer: txn.GetMemBuffer(), - snapshot: s.TemporaryTableSnapshotReader(tblInfo), - } -} diff --git a/table/tables/index.go b/table/tables/index.go index b592c894f74ad..652a4572be57d 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -202,7 +202,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue var value []byte if c.tblInfo.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV - value, err = sctx.GetSessionVars().TemporaryTableTxnReader(txn, c.tblInfo).Get(ctx, key) + value, err = txn.Get(ctx, key) } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { value, err = txn.GetMemBuffer().Get(ctx, key) } else { diff --git a/table/tables/tables.go b/table/tables/tables.go index 90be7e8bf1a07..d47e89e4622f9 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -791,7 +791,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck { if t.meta.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV - _, err = sctx.GetSessionVars().TemporaryTableTxnReader(txn, t.meta).Get(ctx, key) + _, err = txn.Get(ctx, key) } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { var v []byte v, err = txn.GetMemBuffer().Get(ctx, key)