From 1a6a2f7acaa7dd39062cd51a312081ff22b81fa1 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 3 Jun 2020 19:56:24 +0800 Subject: [PATCH 1/9] store/tikv: make snapshot thread safe (#17593) --- store/tikv/snapshot.go | 30 +++++++++++++++++++++--------- store/tikv/snapshot_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 2376c82adde5a..b17d8eb72b21d 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -68,7 +68,10 @@ type tikvSnapshot struct { // cached use len(value)=0 to represent a key-value entry doesn't exist (a reliable truth from TiKV). // In the BatchGet API, it use no key-value entry to represent non-exist. // It's OK as long as there are no zero-byte values in the protocol. - cached map[string][]byte + mu struct { + sync.RWMutex + cached map[string][]byte + } } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -88,7 +91,9 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) * func (s *tikvSnapshot) setSnapshotTS(ts uint64) { // Invalidate cache if the snapshotTS change! s.version.Ver = ts - s.cached = nil + s.mu.Lock() + s.mu.cached = nil + s.mu.Unlock() // And also the minCommitTS pushed information. s.minCommitTSPushed.data = make(map[uint64]struct{}, 5) } @@ -98,10 +103,11 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) { func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { // Check the cached value first. m := make(map[string][]byte) - if s.cached != nil { + s.mu.RLock() + if s.mu.cached != nil { tmp := keys[:0] for _, key := range keys { - if val, ok := s.cached[string(key)]; ok { + if val, ok := s.mu.cached[string(key)]; ok { if len(val) > 0 { m[string(key)] = val } @@ -111,6 +117,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] } keys = tmp } + s.mu.RUnlock() if len(keys) == 0 { return m, nil @@ -142,12 +149,14 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] } // Update the cache. - if s.cached == nil { - s.cached = make(map[string][]byte, len(m)) + s.mu.Lock() + if s.mu.cached == nil { + s.mu.cached = make(map[string][]byte, len(m)) } for _, key := range keys { - s.cached[string(key)] = m[string(key)] + s.mu.cached[string(key)] = m[string(key)] } + s.mu.Unlock() return m, nil } @@ -314,11 +323,14 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { // Check the cached values first. - if s.cached != nil { - if value, ok := s.cached[string(k)]; ok { + s.mu.RLock() + if s.mu.cached != nil { + if value, ok := s.mu.cached[string(k)]; ok { + s.mu.RUnlock() return value, nil } } + s.mu.RUnlock() failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { if bo.ctx.Value("TestSnapshotCache") != nil { diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 024617ce7802a..38a5a8101a2cd 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -16,6 +16,7 @@ package tikv import ( "context" "fmt" + "sync" "time" . "github.com/pingcap/check" @@ -274,3 +275,28 @@ func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) { c.Assert(value, BytesEquals, []byte("y")) c.Assert(time.Since(start), Less, 500*time.Millisecond) } + +func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { + txn := s.beginTxn(c) + key := kv.Key("key_test_snapshot_threadsafe") + c.Assert(txn.Set(key, []byte("x")), IsNil) + ctx := context.Background() + err := txn.Commit(context.Background()) + c.Assert(err, IsNil) + + snapshot := newTiKVSnapshot(s.store, kv.MaxVersion, 0) + var wg sync.WaitGroup + wg.Add(5) + for i := 0; i < 5; i++ { + go func() { + for i := 0; i < 30; i++ { + _, err := snapshot.Get(ctx, key) + c.Assert(err, IsNil) + _, err = snapshot.BatchGet(ctx, []kv.Key{key, kv.Key("key_not_exist")}) + c.Assert(err, IsNil) + } + wg.Done() + }() + } + wg.Wait() +} From 24547e1017e9644f165ef51339f59d2eb740573f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 19 Jun 2020 21:36:57 +0800 Subject: [PATCH 2/9] move point get & batch point get init code to Open --- executor/batch_point_get.go | 101 +++++++++++++++++++----------------- executor/point_get.go | 31 +++++------ session/txn.go | 4 +- 3 files changed, 71 insertions(+), 65 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 83423669380af..ae7d39b215d5e 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -16,6 +16,7 @@ package executor import ( "context" "sort" + "sync/atomic" "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" @@ -34,23 +35,24 @@ import ( type BatchPointGetExec struct { baseExecutor - tblInfo *model.TableInfo - idxInfo *model.IndexInfo - handles []int64 - physIDs []int64 - partPos int - idxVals [][]types.Datum - startTS uint64 - snapshotTS uint64 - txn kv.Transaction - lock bool - waitTime int64 - inited bool - values [][]byte - index int - rowDecoder *rowcodec.ChunkDecoder - keepOrder bool - desc bool + tblInfo *model.TableInfo + idxInfo *model.IndexInfo + handles []int64 + physIDs []int64 + partPos int + idxVals [][]types.Datum + startTS uint64 + snapshotTS uint64 + txn kv.Transaction + lock bool + waitTime int64 + inited uint32 + values [][]byte + index int + rowDecoder *rowcodec.ChunkDecoder + keepOrder bool + desc bool + batchGetter kv.BatchGetter columns []*model.ColumnInfo // virtualColumnIndex records all the indices of virtual columns and sort them in definition @@ -74,6 +76,36 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *BatchPointGetExec) Open(context.Context) error { + e.snapshotTS = e.startTS + txnCtx := e.ctx.GetSessionVars().TxnCtx + if e.lock { + e.snapshotTS = txnCtx.GetForUpdateTS() + } + txn, err := e.ctx.Txn(false) + if err != nil { + return err + } + e.txn = txn + var snapshot kv.Snapshot + if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { + // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. + // The snapshot may contains cache that can reduce RPC call. + snapshot = txn.GetSnapshot() + } else { + snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) + if err != nil { + return err + } + } + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + var batchGetter kv.BatchGetter = snapshot + if txn.Valid() { + batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) + } + e.batchGetter = batchGetter return nil } @@ -85,11 +117,10 @@ func (e *BatchPointGetExec) Close() error { // Next implements the Executor interface. func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() - if !e.inited { + if atomic.CompareAndSwapUint32(&e.inited, 0, 1) { if err := e.initialize(ctx); err != nil { return err } - e.inited = true } if e.index >= len(e.values) { return nil @@ -111,38 +142,10 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { } func (e *BatchPointGetExec) initialize(ctx context.Context) error { - e.snapshotTS = e.startTS - txnCtx := e.ctx.GetSessionVars().TxnCtx - if e.lock { - e.snapshotTS = txnCtx.GetForUpdateTS() - } - txn, err := e.ctx.Txn(false) - if err != nil { - return err - } - e.txn = txn - var snapshot kv.Snapshot - if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { - // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. - // The snapshot may contains cache that can reduce RPC call. - snapshot = txn.GetSnapshot() - } else { - snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) - if err != nil { - return err - } - } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) - } - snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) - var batchGetter kv.BatchGetter = snapshot - if txn.Valid() { - batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) - } - var handleVals map[string][]byte var indexKeys []kv.Key + var err error + batchGetter := e.batchGetter if e.idxInfo != nil { // `SELECT a, b FROM t WHERE (a, b) IN ((1, 2), (1, 2), (2, 1), (1, 2))` should not return duplicated rows dedup := make(map[hack.MutableString]struct{}) diff --git a/executor/point_get.go b/executor/point_get.go index aa0e6a2b37552..a9ebf4db81281 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -112,21 +112,6 @@ func (e *PointGetExecutor) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *PointGetExecutor) Open(context.Context) error { - return nil -} - -// Close implements the Executor interface. -func (e *PointGetExecutor) Close() error { - return nil -} - -// Next implements the Executor interface. -func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - req.Reset() - if e.done { - return nil - } - e.done = true txnCtx := e.ctx.GetSessionVars().TxnCtx snapshotTS := e.startTS if e.lock { @@ -149,7 +134,23 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + return nil +} + +// Close implements the Executor interface. +func (e *PointGetExecutor) Close() error { + return nil +} + +// Next implements the Executor interface. +func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.done { + return nil + } + e.done = true var tblID int64 + var err error if e.partInfo != nil { tblID = e.partInfo.ID } else { diff --git a/session/txn.go b/session/txn.go index e068d921e6da8..227dc24a87764 100644 --- a/session/txn.go +++ b/session/txn.go @@ -310,7 +310,9 @@ func (st *TxnState) Get(ctx context.Context, k kv.Key) ([]byte, error) { // GetMemBuffer overrides the Transaction interface. func (st *TxnState) GetMemBuffer() kv.MemBuffer { if st.stmtBuf == nil || st.stmtBuf.Size() == 0 { - return st.Transaction.GetMemBuffer() + if st.Transaction != nil { + return st.Transaction.GetMemBuffer() + } } return kv.NewBufferStoreFrom(st.Transaction.GetMemBuffer(), st.stmtBuf) } From 2bbd0a9b4df9bdcb7b52ee1fb54bf83d4f50a24e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 29 Jun 2020 15:56:50 +0800 Subject: [PATCH 3/9] address comment --- executor/batch_point_get.go | 5 +++-- executor/point_get.go | 5 +++-- session/txn.go | 9 +++++++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index ae7d39b215d5e..8e3b05e366233 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -87,11 +87,12 @@ func (e *BatchPointGetExec) Open(context.Context) error { } e.txn = txn var snapshot kv.Snapshot - if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { + if txnCtx.StartTS == txnCtx.GetForUpdateTS() { // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. // The snapshot may contains cache that can reduce RPC call. snapshot = txn.GetSnapshot() - } else { + } + if snapshot == nil { snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) if err != nil { return err diff --git a/executor/point_get.go b/executor/point_get.go index a9ebf4db81281..44e991764b91f 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -122,9 +122,10 @@ func (e *PointGetExecutor) Open(context.Context) error { if err != nil { return err } - if e.txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { + if txnCtx.StartTS == txnCtx.GetForUpdateTS() { e.snapshot = e.txn.GetSnapshot() - } else { + } + if e.snapshot == nil { e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) if err != nil { return err diff --git a/session/txn.go b/session/txn.go index 227dc24a87764..211c5216bc331 100644 --- a/session/txn.go +++ b/session/txn.go @@ -82,6 +82,15 @@ func (st *TxnState) stmtBufSize() int { return st.stmtBuf.Size() } +// GetSnapshot implements the kv.Transaction interface. +func (st *TxnState) GetSnapshot() kv.Snapshot { + txn := st.Transaction + if txn != nil && txn.Valid() { + return txn.GetSnapshot() + } + return nil +} + func (st *TxnState) stmtBufGet(ctx context.Context, k kv.Key) ([]byte, error) { if st.stmtBuf == nil { return nil, kv.ErrNotExist From 9f6abc184971ec3c7b5c1b4383a81e1bd1035485 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 29 Jun 2020 16:36:49 +0800 Subject: [PATCH 4/9] address comment --- executor/batch_point_get.go | 102 +++++++++++++++++------------------- executor/point_get.go | 31 ++++++----- 2 files changed, 64 insertions(+), 69 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 8e3b05e366233..368b1cee299eb 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -16,7 +16,6 @@ package executor import ( "context" "sort" - "sync/atomic" "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" @@ -35,24 +34,23 @@ import ( type BatchPointGetExec struct { baseExecutor - tblInfo *model.TableInfo - idxInfo *model.IndexInfo - handles []int64 - physIDs []int64 - partPos int - idxVals [][]types.Datum - startTS uint64 - snapshotTS uint64 - txn kv.Transaction - lock bool - waitTime int64 - inited uint32 - values [][]byte - index int - rowDecoder *rowcodec.ChunkDecoder - keepOrder bool - desc bool - batchGetter kv.BatchGetter + tblInfo *model.TableInfo + idxInfo *model.IndexInfo + handles []int64 + physIDs []int64 + partPos int + idxVals [][]types.Datum + startTS uint64 + snapshotTS uint64 + txn kv.Transaction + lock bool + waitTime int64 + inited bool + values [][]byte + index int + rowDecoder *rowcodec.ChunkDecoder + keepOrder bool + desc bool columns []*model.ColumnInfo // virtualColumnIndex records all the indices of virtual columns and sort them in definition @@ -76,37 +74,6 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *BatchPointGetExec) Open(context.Context) error { - e.snapshotTS = e.startTS - txnCtx := e.ctx.GetSessionVars().TxnCtx - if e.lock { - e.snapshotTS = txnCtx.GetForUpdateTS() - } - txn, err := e.ctx.Txn(false) - if err != nil { - return err - } - e.txn = txn - var snapshot kv.Snapshot - if txnCtx.StartTS == txnCtx.GetForUpdateTS() { - // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. - // The snapshot may contains cache that can reduce RPC call. - snapshot = txn.GetSnapshot() - } - if snapshot == nil { - snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) - if err != nil { - return err - } - } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) - } - snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) - var batchGetter kv.BatchGetter = snapshot - if txn.Valid() { - batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) - } - e.batchGetter = batchGetter return nil } @@ -118,10 +85,11 @@ func (e *BatchPointGetExec) Close() error { // Next implements the Executor interface. func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() - if atomic.CompareAndSwapUint32(&e.inited, 0, 1) { + if !e.inited { if err := e.initialize(ctx); err != nil { return err } + e.inited = true } if e.index >= len(e.values) { return nil @@ -143,10 +111,38 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { } func (e *BatchPointGetExec) initialize(ctx context.Context) error { + e.snapshotTS = e.startTS + txnCtx := e.ctx.GetSessionVars().TxnCtx + if e.lock { + e.snapshotTS = txnCtx.GetForUpdateTS() + } + txn, err := e.ctx.Txn(false) + if err != nil { + return err + } + e.txn = txn + var snapshot kv.Snapshot + if txnCtx.StartTS == txnCtx.GetForUpdateTS() { + // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. + // The snapshot may contains cache that can reduce RPC call. + snapshot = txn.GetSnapshot() + } + if snapshot == nil { + snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) + if err != nil { + return err + } + } + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + var batchGetter kv.BatchGetter = snapshot + if txn.Valid() { + batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) + } var handleVals map[string][]byte var indexKeys []kv.Key - var err error - batchGetter := e.batchGetter if e.idxInfo != nil { // `SELECT a, b FROM t WHERE (a, b) IN ((1, 2), (1, 2), (2, 1), (1, 2))` should not return duplicated rows dedup := make(map[hack.MutableString]struct{}) diff --git a/executor/point_get.go b/executor/point_get.go index 44e991764b91f..ddd7d9b8dbb1b 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -112,6 +112,21 @@ func (e *PointGetExecutor) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *PointGetExecutor) Open(context.Context) error { + return nil +} + +// Close implements the Executor interface. +func (e *PointGetExecutor) Close() error { + return nil +} + +// Next implements the Executor interface. +func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.done { + return nil + } + e.done = true txnCtx := e.ctx.GetSessionVars().TxnCtx snapshotTS := e.startTS if e.lock { @@ -135,23 +150,7 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) - return nil -} - -// Close implements the Executor interface. -func (e *PointGetExecutor) Close() error { - return nil -} - -// Next implements the Executor interface. -func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - req.Reset() - if e.done { - return nil - } - e.done = true var tblID int64 - var err error if e.partInfo != nil { tblID = e.partInfo.ID } else { From fce7f1e9e11e2f49e20606a450e164e134302f09 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 29 Jun 2020 18:50:42 +0800 Subject: [PATCH 5/9] address comment --- executor/batch_point_get.go | 4 ++-- executor/point_get.go | 5 +++-- session/txn.go | 11 ++++++++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 368b1cee299eb..6ee448996bb5a 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -138,8 +138,8 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) var batchGetter kv.BatchGetter = snapshot - if txn.Valid() { - batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) + if mb := txn.GetMemBuffer(); mb != nil { + batchGetter = kv.NewBufferBatchGetter(mb, &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) } var handleVals map[string][]byte var indexKeys []kv.Key diff --git a/executor/point_get.go b/executor/point_get.go index ddd7d9b8dbb1b..d16d76f14ba11 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -278,10 +278,11 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro // get will first try to get from txn buffer, then check the pessimistic lock cache, // then the store. Kv.ErrNotExist will be returned if key is not found func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) { - if e.txn.Valid() && !e.txn.IsReadOnly() { + mb := e.txn.GetMemBuffer() + if mb != nil && !e.txn.IsReadOnly() { // We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be // different for pessimistic transaction. - val, err := e.txn.GetMemBuffer().Get(ctx, key) + val, err := mb.Get(ctx, key) if err == nil { return val, err } diff --git a/session/txn.go b/session/txn.go index 211c5216bc331..d78289445c087 100644 --- a/session/txn.go +++ b/session/txn.go @@ -318,12 +318,17 @@ func (st *TxnState) Get(ctx context.Context, k kv.Key) ([]byte, error) { // GetMemBuffer overrides the Transaction interface. func (st *TxnState) GetMemBuffer() kv.MemBuffer { + txn := st.Transaction if st.stmtBuf == nil || st.stmtBuf.Size() == 0 { - if st.Transaction != nil { - return st.Transaction.GetMemBuffer() + if txn != nil && txn.Valid() { + return txn.GetMemBuffer() } + return nil + } + if txn == nil || !txn.Valid() { + return st.stmtBuf } - return kv.NewBufferStoreFrom(st.Transaction.GetMemBuffer(), st.stmtBuf) + return kv.NewBufferStoreFrom(txn.GetMemBuffer(), st.stmtBuf) } // BatchGet overrides the Transaction interface. From ed3b375a70d94f7af80e4a3d8c57cd96b13ab53e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 30 Jun 2020 11:43:56 +0800 Subject: [PATCH 6/9] address comment --- executor/batch_point_get.go | 7 ++++--- executor/point_get.go | 40 +++++++++++++++++++------------------ 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 6ee448996bb5a..071e610af2678 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -112,7 +112,8 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *BatchPointGetExec) initialize(ctx context.Context) error { e.snapshotTS = e.startTS - txnCtx := e.ctx.GetSessionVars().TxnCtx + sessVars := e.ctx.GetSessionVars() + txnCtx := sessVars.TxnCtx if e.lock { e.snapshotTS = txnCtx.GetForUpdateTS() } @@ -122,7 +123,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } e.txn = txn var snapshot kv.Snapshot - if txnCtx.StartTS == txnCtx.GetForUpdateTS() { + if sessVars.InTxn() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. // The snapshot may contains cache that can reduce RPC call. snapshot = txn.GetSnapshot() @@ -138,7 +139,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) var batchGetter kv.BatchGetter = snapshot - if mb := txn.GetMemBuffer(); mb != nil { + if mb := txn.GetMemBuffer(); sessVars.InTxn() && mb != nil { batchGetter = kv.NewBufferBatchGetter(mb, &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) } var handleVals map[string][]byte diff --git a/executor/point_get.go b/executor/point_get.go index d16d76f14ba11..923f5a85cf0d3 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -127,7 +127,8 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } e.done = true - txnCtx := e.ctx.GetSessionVars().TxnCtx + sessVars := e.ctx.GetSessionVars() + txnCtx := sessVars.TxnCtx snapshotTS := e.startTS if e.lock { snapshotTS = txnCtx.GetForUpdateTS() @@ -137,7 +138,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } - if txnCtx.StartTS == txnCtx.GetForUpdateTS() { + if sessVars.InTxn() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { e.snapshot = e.txn.GetSnapshot() } if e.snapshot == nil { @@ -278,24 +279,25 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro // get will first try to get from txn buffer, then check the pessimistic lock cache, // then the store. Kv.ErrNotExist will be returned if key is not found func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) { - mb := e.txn.GetMemBuffer() - if mb != nil && !e.txn.IsReadOnly() { - // We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be - // different for pessimistic transaction. - val, err := mb.Get(ctx, key) - if err == nil { - return val, err - } - if !kv.IsErrNotFound(err) { - return nil, err - } - // key does not exist in mem buffer, check the lock cache - var ok bool - val, ok = e.ctx.GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key) - if ok { - return val, nil + if e.ctx.GetSessionVars().InTxn() && !e.txn.IsReadOnly() { + if mb := e.txn.GetMemBuffer(); mb != nil { + // We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be + // different for pessimistic transaction. + val, err := mb.Get(ctx, key) + if err == nil { + return val, err + } + if !kv.IsErrNotFound(err) { + return nil, err + } + // key does not exist in mem buffer, check the lock cache + var ok bool + val, ok = e.ctx.GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key) + if ok { + return val, nil + } + // fallthrough to snapshot get. } - // fallthrough to snapshot get. } return e.snapshot.Get(ctx, key) } From 632381f9ba3374130598c3fc8c66669b58a73b77 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 30 Jun 2020 12:00:14 +0800 Subject: [PATCH 7/9] address comment --- executor/batch_point_get.go | 6 ++++-- session/txn.go | 11 ++--------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 071e610af2678..94d226b928b6f 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -139,8 +139,10 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) var batchGetter kv.BatchGetter = snapshot - if mb := txn.GetMemBuffer(); sessVars.InTxn() && mb != nil { - batchGetter = kv.NewBufferBatchGetter(mb, &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) + if sessVars.InTxn() { + if mb := txn.GetMemBuffer(); mb != nil { + batchGetter = kv.NewBufferBatchGetter(mb, &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) + } } var handleVals map[string][]byte var indexKeys []kv.Key diff --git a/session/txn.go b/session/txn.go index d78289445c087..cb6c70098198c 100644 --- a/session/txn.go +++ b/session/txn.go @@ -318,17 +318,10 @@ func (st *TxnState) Get(ctx context.Context, k kv.Key) ([]byte, error) { // GetMemBuffer overrides the Transaction interface. func (st *TxnState) GetMemBuffer() kv.MemBuffer { - txn := st.Transaction if st.stmtBuf == nil || st.stmtBuf.Size() == 0 { - if txn != nil && txn.Valid() { - return txn.GetMemBuffer() - } - return nil - } - if txn == nil || !txn.Valid() { - return st.stmtBuf + return txn.GetMemBuffer() } - return kv.NewBufferStoreFrom(txn.GetMemBuffer(), st.stmtBuf) + return kv.NewBufferStoreFrom(st.Transaction.GetMemBuffer(), st.stmtBuf) } // BatchGet overrides the Transaction interface. From 2ec43466154fab21dae606c63774308f4a5183aa Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 30 Jun 2020 12:22:17 +0800 Subject: [PATCH 8/9] address comment --- session/txn.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/session/txn.go b/session/txn.go index cb6c70098198c..e068d921e6da8 100644 --- a/session/txn.go +++ b/session/txn.go @@ -82,15 +82,6 @@ func (st *TxnState) stmtBufSize() int { return st.stmtBuf.Size() } -// GetSnapshot implements the kv.Transaction interface. -func (st *TxnState) GetSnapshot() kv.Snapshot { - txn := st.Transaction - if txn != nil && txn.Valid() { - return txn.GetSnapshot() - } - return nil -} - func (st *TxnState) stmtBufGet(ctx context.Context, k kv.Key) ([]byte, error) { if st.stmtBuf == nil { return nil, kv.ErrNotExist @@ -319,7 +310,7 @@ func (st *TxnState) Get(ctx context.Context, k kv.Key) ([]byte, error) { // GetMemBuffer overrides the Transaction interface. func (st *TxnState) GetMemBuffer() kv.MemBuffer { if st.stmtBuf == nil || st.stmtBuf.Size() == 0 { - return txn.GetMemBuffer() + return st.Transaction.GetMemBuffer() } return kv.NewBufferStoreFrom(st.Transaction.GetMemBuffer(), st.stmtBuf) } From 77dad1e804462c05e689ddca67d1a5c4abe4fc2a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 30 Jun 2020 12:27:11 +0800 Subject: [PATCH 9/9] address comment --- executor/batch_point_get.go | 8 +++----- executor/point_get.go | 35 ++++++++++++++++------------------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 94d226b928b6f..122a858e9631d 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -127,8 +127,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. // The snapshot may contains cache that can reduce RPC call. snapshot = txn.GetSnapshot() - } - if snapshot == nil { + } else { snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) if err != nil { return err @@ -140,10 +139,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) var batchGetter kv.BatchGetter = snapshot if sessVars.InTxn() { - if mb := txn.GetMemBuffer(); mb != nil { - batchGetter = kv.NewBufferBatchGetter(mb, &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) - } + batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot) } + var handleVals map[string][]byte var indexKeys []kv.Key if e.idxInfo != nil { diff --git a/executor/point_get.go b/executor/point_get.go index 923f5a85cf0d3..a12b2633ee3aa 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -140,8 +140,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { } if sessVars.InTxn() && txnCtx.StartTS == txnCtx.GetForUpdateTS() { e.snapshot = e.txn.GetSnapshot() - } - if e.snapshot == nil { + } else { e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) if err != nil { return err @@ -280,24 +279,22 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro // then the store. Kv.ErrNotExist will be returned if key is not found func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) { if e.ctx.GetSessionVars().InTxn() && !e.txn.IsReadOnly() { - if mb := e.txn.GetMemBuffer(); mb != nil { - // We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be - // different for pessimistic transaction. - val, err := mb.Get(ctx, key) - if err == nil { - return val, err - } - if !kv.IsErrNotFound(err) { - return nil, err - } - // key does not exist in mem buffer, check the lock cache - var ok bool - val, ok = e.ctx.GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key) - if ok { - return val, nil - } - // fallthrough to snapshot get. + // We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be + // different for pessimistic transaction. + val, err := e.txn.GetMemBuffer().Get(ctx, key) + if err == nil { + return val, err + } + if !kv.IsErrNotFound(err) { + return nil, err + } + // key does not exist in mem buffer, check the lock cache + var ok bool + val, ok = e.ctx.GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key) + if ok { + return val, nil } + // fallthrough to snapshot get. } return e.snapshot.Get(ctx, key) }