diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 8903bdfcc9..4d81e1481a 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -275,6 +275,12 @@ func (req *Request) EnableStaleRead() { req.ReplicaRead = false } +// DisableStaleReadMeetLock is called when stale-read fallbacks to leader read after meeting key-is-locked error. +func (req *Request) DisableStaleReadMeetLock() { + req.StaleRead = false + req.ReplicaReadType = kv.ReplicaReadLeader +} + // IsGlobalStaleRead checks if the request is a global stale read request. func (req *Request) IsGlobalStaleRead() bool { return req.ReadReplicaScope == oracle.GlobalTxnScope && diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index a1d14d4687..b4671d6def 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -381,6 +381,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, s.mergeRegionRequestStats(cli.Stats) }() } + isStaleness := s.mu.isStaleness + busyThresholdMs := s.mu.busyThreshold.Milliseconds() s.mu.RUnlock() pending := batch.keys @@ -400,13 +402,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, ResourceControlContext: &kvrpcpb.ResourceControlContext{ ResourceGroupName: s.mu.resourceGroupName, }, - BusyThresholdMs: uint32(s.mu.busyThreshold.Milliseconds()), + BusyThresholdMs: uint32(busyThresholdMs), }) if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) } scope := s.mu.readReplicaScope - isStaleness := s.mu.isStaleness matchStoreLabels := s.mu.matchStoreLabels replicaAdjuster := s.mu.replicaReadAdjuster s.mu.RUnlock() @@ -504,6 +505,11 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } else { cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken) } + // we need to read from leader after resolving the lock. + if isStaleness { + isStaleness = false + busyThresholdMs = 0 + } resolveLocksOpts := txnlock.ResolveLocksOptions{ CallerStartTS: s.version, Locks: locks, @@ -694,6 +700,11 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] return nil, err } if firstLock == nil { + // we need to read from leader after resolving the lock. + if isStaleness { + req.DisableStaleReadMeetLock() + req.BusyThresholdMs = 0 + } firstLock = lock } else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID { // If it is an autocommit point get, it needs to be blocked only