diff --git a/pkg/ccl/followerreadsccl/followerreads.go b/pkg/ccl/followerreadsccl/followerreads.go index a5a19c092b23..5a649d41c4bc 100644 --- a/pkg/ccl/followerreadsccl/followerreads.go +++ b/pkg/ccl/followerreadsccl/followerreads.go @@ -76,7 +76,7 @@ func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Dur // batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of // requests that can be evaluated on a follower replica. func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { - return ba.IsReadOnly() && ba.IsAllTransactional() + return !ba.IsLocking() && ba.IsAllTransactional() } // txnCanPerformFollowerRead determines if the provided transaction can perform diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 21a90015db18..9ca659f610d5 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -2972,7 +2972,7 @@ func TestCanSendToFollower(t *testing.T) { defer func() { CanSendToFollower = old }() canSend := true CanSendToFollower = func(_ uuid.UUID, _ *cluster.Settings, ba roachpb.BatchRequest) bool { - return ba.IsReadOnly() && canSend + return !ba.IsLocking() && canSend } clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index f2bbb1d69433..7252863b9ed2 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -27,7 +27,7 @@ func DefaultDeclareKeys( latchSpans, _ *spanset.SpanSet, ) { var access spanset.SpanAccess - if roachpb.IsReadOnly(req) { + if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) { access = spanset.SpanReadOnly } else { access = spanset.SpanReadWrite @@ -47,7 +47,7 @@ func DefaultDeclareIsolatedKeys( latchSpans, lockSpans *spanset.SpanSet, ) { var access spanset.SpanAccess - if roachpb.IsReadOnly(req) { + if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) { access = spanset.SpanReadOnly } else { access = spanset.SpanReadWrite diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index 6e2a46c0f0bd..f55e35c9fda0 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -130,7 +130,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool { // We don't try to ack async consensus writes before application because we // know that there isn't a client waiting for the result. req := c.proposal.Request - return req.IsTransactionWrite() && !req.AsyncConsensus + return req.IsIntentWrite() && !req.AsyncConsensus } // AckSuccess implements the apply.CheckedCommand interface. diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index cc1d15e1152c..9a5fc8bcb42e 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -69,7 +69,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest, for _, ru := range otherReqs { req := ru.GetInner() switch { - case roachpb.IsTransactionWrite(req) && !roachpb.IsRange(req): + case roachpb.IsIntentWrite(req) && !roachpb.IsRange(req): // Concurrent point write. writes++ case req.Method() == roachpb.QueryIntent: @@ -100,7 +100,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest, req := ru.GetInner() seq := req.Header().Sequence switch { - case roachpb.IsTransactionWrite(req) && !roachpb.IsRange(req): + case roachpb.IsIntentWrite(req) && !roachpb.IsRange(req): // Concurrent point write. case req.Method() == roachpb.QueryIntent: // Earlier pipelined point write that hasn't been proven yet. We diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 9311cecb8723..1f3d9024e1c6 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -54,7 +54,7 @@ func (r *Replica) canServeFollowerRead( canServeFollowerRead := false if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && - ba.IsAllTransactional() && // followerreadsccl.batchCanBeEvaluatedOnFollower + (!ba.IsLocking() && ba.IsAllTransactional()) && // followerreadsccl.batchCanBeEvaluatedOnFollower (ba.Txn == nil || !ba.Txn.IsLocking()) && // followerreadsccl.txnCanPerformFollowerRead FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 47d9184ab71e..de698fd02254 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -501,15 +501,15 @@ func (r *Replica) collectSpans( // // TODO(bdarnell): revisit as the local portion gets its appropriate // use. - if ba.IsReadOnly() { - latchSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) - } else { + if ba.IsLocking() { guess := len(ba.Requests) if et, ok := ba.GetArg(roachpb.EndTxn); ok { // EndTxn declares a global write for each of its intent spans. guess += len(et.(*roachpb.EndTxnRequest).IntentSpans) - 1 } latchSpans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess) + } else { + latchSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } // For non-local, MVCC spans we annotate them with the request timestamp diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index 95f5efd9f1b8..429893aa9889 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -303,7 +303,7 @@ func (tc *txnCommitter) canCommitInParallel( for _, ru := range ba.Requests[:len(ba.Requests)-1] { req := ru.GetInner() switch { - case roachpb.IsTransactionWrite(req): + case roachpb.IsIntentWrite(req): if roachpb.IsRange(req) { // Similar to how we can't pipeline ranged writes, we also can't // commit in parallel with them. The reason for this is that the diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 8754bc5655e3..15ce3909125e 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -141,17 +141,16 @@ func (h *txnHeartbeater) init( func (h *txnHeartbeater) SendLocked( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { - firstWriteIdx, pErr := firstWriteIndex(&ba) + firstLockingIndex, pErr := firstLockingIndex(&ba) if pErr != nil { return nil, pErr } - haveTxnWrite := firstWriteIdx != -1 - if haveTxnWrite { + if firstLockingIndex != -1 { // Set txn key based on the key of the first transactional write if not // already set. If it is already set, make sure we keep the anchor key // the same. if len(h.mu.txn.Key) == 0 { - anchor := ba.Requests[firstWriteIdx].GetInner().Header().Key + anchor := ba.Requests[firstLockingIndex].GetInner().Header().Key h.mu.txn.Key = anchor // Put the anchor also in the ba's copy of the txn, since this batch // was prepared before we had an anchor. @@ -396,11 +395,11 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) { } } -// firstWriteIndex returns the index of the first transactional write in the -// BatchRequest. Returns -1 if the batch has not intention to write. It also -// verifies that if an EndTxnRequest is included, then it is the last request -// in the batch. -func firstWriteIndex(ba *roachpb.BatchRequest) (int, *roachpb.Error) { +// firstLockingIndex returns the index of the first request that acquires locks +// in the BatchRequest. Returns -1 if the batch has no intention to acquire +// locks. It also verifies that if an EndTxnRequest is included, then it is the +// last request in the batch. +func firstLockingIndex(ba *roachpb.BatchRequest) (int, *roachpb.Error) { for i, ru := range ba.Requests { args := ru.GetInner() if i < len(ba.Requests)-1 /* if not last*/ { @@ -408,7 +407,7 @@ func firstWriteIndex(ba *roachpb.BatchRequest) (int, *roachpb.Error) { return -1, roachpb.NewErrorf("%s sent as non-terminal call", args.Method()) } } - if roachpb.IsTransactionWrite(args) { + if roachpb.IsLocking(args) { return i, nil } } diff --git a/pkg/kv/txn_interceptor_heartbeater_test.go b/pkg/kv/txn_interceptor_heartbeater_test.go index c2a0af75b220..e4c3f8b79160 100644 --- a/pkg/kv/txn_interceptor_heartbeater_test.go +++ b/pkg/kv/txn_interceptor_heartbeater_test.go @@ -138,50 +138,58 @@ func TestTxnHeartbeaterSetsTransactionKey(t *testing.T) { require.Equal(t, keyB, roachpb.Key(txn.Key)) } -// TestTxnHeartbeaterLoopStartedOnFirstWrite tests that the txnHeartbeater -// doesn't start its heartbeat loop until it observes the transaction perform -// a write. -func TestTxnHeartbeaterLoopStartedOnFirstWrite(t *testing.T) { +// TestTxnHeartbeaterLoopStartedOnFirstLock tests that the txnHeartbeater +// doesn't start its heartbeat loop until it observes the transaction issues +// a request that will acquire locks. +func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - txn := makeTxnProto() - th, _, _ := makeMockTxnHeartbeater(&txn) - defer th.stopper.Stop(ctx) + testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) { + ctx := context.Background() + txn := makeTxnProto() + th, _, _ := makeMockTxnHeartbeater(&txn) + defer th.stopper.Stop(ctx) - // Read-only requests don't start the heartbeat loop. - keyA := roachpb.Key("a") - var ba roachpb.BatchRequest - ba.Header = roachpb.Header{Txn: txn.Clone()} - ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + // Read-only requests don't start the heartbeat loop. + keyA := roachpb.Key("a") + keyAHeader := roachpb.RequestHeader{Key: keyA} + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: txn.Clone()} + ba.Add(&roachpb.GetRequest{RequestHeader: keyAHeader}) - br, pErr := th.SendLocked(ctx, ba) - require.Nil(t, pErr) - require.NotNil(t, br) + br, pErr := th.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) - th.mu.Lock() - require.False(t, th.mu.loopStarted) - require.False(t, th.heartbeatLoopRunningLocked()) - th.mu.Unlock() + th.mu.Lock() + require.False(t, th.mu.loopStarted) + require.False(t, th.heartbeatLoopRunningLocked()) + th.mu.Unlock() - // The heartbeat loop is started on the first writing request. - ba.Requests = nil - ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + // The heartbeat loop is started on the first locking request. + ba.Requests = nil + if write { + ba.Add(&roachpb.PutRequest{RequestHeader: keyAHeader}) + } else { + t.Skip("TODO(nvanbenschoten): uncomment") + // ba.Add(&roachpb.ScanRequest{RequestHeader: keyAHeader, KeyLocking: lock.Exclusive}) + } - br, pErr = th.SendLocked(ctx, ba) - require.Nil(t, pErr) - require.NotNil(t, br) + br, pErr = th.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) - th.mu.Lock() - require.True(t, th.mu.loopStarted) - require.True(t, th.heartbeatLoopRunningLocked()) - th.mu.Unlock() + th.mu.Lock() + require.True(t, th.mu.loopStarted) + require.True(t, th.heartbeatLoopRunningLocked()) + th.mu.Unlock() - // Closing the interceptor stops the heartbeat loop. - th.mu.Lock() - th.closeLocked() - th.mu.Unlock() - waitForHeartbeatLoopToStop(t, &th) - require.True(t, th.mu.loopStarted) // still set + // Closing the interceptor stops the heartbeat loop. + th.mu.Lock() + th.closeLocked() + th.mu.Unlock() + waitForHeartbeatLoopToStop(t, &th) + require.True(t, th.mu.loopStarted) // still set + }) } // TestTxnHeartbeaterLoopNotStartedFor1PC tests that the txnHeartbeater does diff --git a/pkg/kv/txn_interceptor_pipeliner.go b/pkg/kv/txn_interceptor_pipeliner.go index 2ffc551f5a3a..c82f3c01abf8 100644 --- a/pkg/kv/txn_interceptor_pipeliner.go +++ b/pkg/kv/txn_interceptor_pipeliner.go @@ -16,6 +16,7 @@ import ( "sort" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -250,7 +251,7 @@ func (tp *txnPipeliner) attachWritesToEndTxn( for _, ru := range ba.Requests[:len(ba.Requests)-1] { req := ru.GetInner() h := req.Header() - if roachpb.IsTransactionWrite(req) { + if roachpb.IsIntentWrite(req) { // Ranged writes are added immediately to the intent spans because // it's not clear where they will actually leave intents. Point // writes are added to the in-flight writes set. @@ -330,7 +331,7 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba roachpb.BatchRequest) roachpb.B // If we're currently planning on performing the batch with // performing async consensus, determine whether this request // changes that. - if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) { + if !roachpb.IsIntentWrite(req) || roachpb.IsRange(req) { // Only allow batches consisting of solely transactional point // writes to perform consensus asynchronously. // TODO(nvanbenschoten): We could allow batches with reads and point @@ -449,23 +450,23 @@ func (tp *txnPipeliner) updateWriteTracking( if br == nil { // The transaction cannot continue in this epoch whether this is // a retryable error or not. - ba.IntentSpanIterate(nil, tp.footprint.insert) + ba.LockSpanIterate(nil, tp.trackLocks) return } // Similarly, if the transaction is now finalized, we don't need to - // accurately update the write tracking. + // accurately update the lock tracking. if br.Txn.Status.IsFinalized() { switch br.Txn.Status { case roachpb.ABORTED: - // If the transaction is now ABORTED, add all intent writes from - // the batch directly to the write footprint. We don't know which - // of these succeeded. - ba.IntentSpanIterate(nil, tp.footprint.insert) + // If the transaction is now ABORTED, add all locks acquired by the + // batch directly to the lock footprint. We don't know which of + // these succeeded. + ba.LockSpanIterate(nil, tp.trackLocks) case roachpb.COMMITTED: // If the transaction is now COMMITTED, it must not have any more // in-flight writes, so clear them. Technically we should move all - // of these to the write footprint, but since the transaction is + // of these to the lock footprint, but since the transaction is // already committed, there's no reason to. tp.ifWrites.clear( /* reuse - we're not going to use this Btree again, so there's no point in @@ -492,7 +493,7 @@ func (tp *txnPipeliner) updateWriteTracking( // Move to write footprint. tp.footprint.insert(roachpb.Span{Key: qiReq.Key}) } - } else if roachpb.IsTransactionWrite(req) { + } else if roachpb.IsIntentWrite(req) { // If the request was a transactional write, track its intents. if ba.AsyncConsensus { // Record any writes that were performed asynchronously. We'll @@ -510,6 +511,14 @@ func (tp *txnPipeliner) updateWriteTracking( } } +func (tp *txnPipeliner) trackLocks(s roachpb.Span, dur lock.Durability) { + // TODO(nvanbenschoten): handle unreplicated locks. + if dur != lock.Replicated { + panic("unexpected lock durability") + } + tp.footprint.insert(s) +} + // stripQueryIntents adjusts the BatchResponse to hide the fact that this // interceptor added new requests to the batch. It returns an adjusted batch // response without the responses that correspond to these added requests. diff --git a/pkg/kv/txn_interceptor_seq_num_allocator.go b/pkg/kv/txn_interceptor_seq_num_allocator.go index 283b50620582..f5324ad1c215 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator.go @@ -87,7 +87,7 @@ func (s *txnSeqNumAllocator) SendLocked( // Only increment the sequence number generator for requests that // will leave intents or requests that will commit the transaction. // This enables ba.IsCompleteTransaction to work properly. - if roachpb.IsTransactionWrite(req) || req.Method() == roachpb.EndTxn { + if roachpb.IsIntentWrite(req) || req.Method() == roachpb.EndTxn { s.writeSeq++ } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index ed93d5f16509..05cf7a2a2343 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -88,7 +88,8 @@ const ( isRead // read-only cmds don't go through raft, but may run on lease holder isWrite // write cmds go through raft and must be proposed on lease holder isTxn // txn commands may be part of a transaction - isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution + isLocking // locking cmds acquire locks for their transaction (implies isTxn) + isIntentWrite // intent write cmds leave intents when they succeed (implies isWrite and isLocking) isRange // range commands may span multiple keys isReverse // reverse commands traverse ranges in descending direction isAlone // requests which must be alone in a batch @@ -102,14 +103,20 @@ const ( canBackpressure // commands which deserve backpressure when a Range grows too large ) -// IsReadOnly returns true iff the request is read-only. +// IsReadOnly returns true iff the request is read-only. A request is +// read-only if it does not go through raft, meaning that it cannot +// change any replicated state. However, read-only requests may still +// acquire locks with an unreplicated durability level; see IsLocking. func IsReadOnly(args Request) bool { flags := args.flags() return (flags&isRead) != 0 && (flags&isWrite) == 0 } // IsReadAndWrite returns true if the request both reads and writes -// (such as conditional puts and increments). +// (such as conditional puts and increments). These requests contrast +// with blind-writes, which do not observe any state when modifying +// replicated state and can be more freely re-ordered with other writes +// as a result. func IsReadAndWrite(args Request) bool { flags := args.flags() return (flags&isRead) != 0 && (flags&isWrite) != 0 @@ -121,10 +128,25 @@ func IsTransactional(args Request) bool { return (args.flags() & isTxn) != 0 } -// IsTransactionWrite returns true if the request produces write -// intents when used within a transaction. -func IsTransactionWrite(args Request) bool { - return (args.flags() & isTxnWrite) != 0 +// IsLocking returns true if the request acquires locks when used within +// a transaction. +func IsLocking(args Request) bool { + return (args.flags() & isLocking) != 0 +} + +// LockingDurability returns the durability of the locks acquired by the +// request. The function assumes that IsLocking(args). +func LockingDurability(args Request) lock.Durability { + if IsReadOnly(args) { + return lock.Unreplicated + } + return lock.Replicated +} + +// IsIntentWrite returns true if the request produces write intents at +// the request's sequence number when used within a transaction. +func IsIntentWrite(args Request) bool { + return (args.flags() & isIntentWrite) != 0 } // IsRange returns true if the command is range-based and must include @@ -996,9 +1018,12 @@ func NewReverseScan(key, endKey Key) Request { } } -func (*GetRequest) flags() int { return isRead | isTxn | updatesTSCache | needsRefresh } +func (*GetRequest) flags() int { + return isRead | isTxn | updatesTSCache | needsRefresh +} + func (*PutRequest) flags() int { - return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure + return isWrite | isTxn | isLocking | isIntentWrite | consultsTSCache | canBackpressure } // ConditionalPut effectively reads without writing if it hits a @@ -1007,7 +1032,7 @@ func (*PutRequest) flags() int { // they return an error immediately instead of continuing a serializable // transaction to be retried at end transaction. func (*ConditionalPutRequest) flags() int { - return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | consultsTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure } // InitPut, like ConditionalPut, effectively reads without writing if it hits a @@ -1016,7 +1041,7 @@ func (*ConditionalPutRequest) flags() int { // return an error immediately instead of continuing a serializable transaction // to be retried at end transaction. func (*InitPutRequest) flags() int { - return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | consultsTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure } // Increment reads the existing value, but always leaves an intent so @@ -1025,12 +1050,13 @@ func (*InitPutRequest) flags() int { // error immediately instead of continuing a serializable transaction // to be retried at end transaction. func (*IncrementRequest) flags() int { - return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | consultsTSCache | canBackpressure } func (*DeleteRequest) flags() int { - return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure + return isWrite | isTxn | isLocking | isIntentWrite | consultsTSCache | canBackpressure } + func (drr *DeleteRangeRequest) flags() int { // DeleteRangeRequest has different properties if the "inline" flag is set. // This flag indicates that the request is deleting inline MVCC values, @@ -1054,7 +1080,7 @@ func (drr *DeleteRangeRequest) flags() int { // anybody from writing under it. Note that, even if we didn't update the ts // cache, deletes of keys that exist would not be lost (since the DeleteRange // leaves intents on those keys), but deletes of "empty space" would. - return isWrite | isTxn | isTxnWrite | isRange | consultsTSCache | updatesTSCache | needsRefresh | canBackpressure + return isWrite | isTxn | isLocking | isIntentWrite | isRange | consultsTSCache | updatesTSCache | needsRefresh | canBackpressure } // Note that ClearRange commands cannot be part of a transaction as diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index ef9e49d657cf..2cc011447781 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -16,6 +16,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" @@ -99,7 +100,7 @@ func (ba *BatchRequest) IsReadOnly() bool { // RequiresLeaseHolder returns true if the request can only be served by the // leaseholders of the ranges it addresses. func (ba *BatchRequest) RequiresLeaseHolder() bool { - return !ba.IsReadOnly() || ba.Header.ReadConsistency.RequiresReadLease() + return ba.IsLocking() || ba.Header.ReadConsistency.RequiresReadLease() } // IsReverse returns true iff the BatchRequest contains a reverse request. @@ -119,9 +120,14 @@ func (ba *BatchRequest) IsAllTransactional() bool { return ba.hasFlagForAll(isTxn) } -// IsTransactionWrite returns true iff the BatchRequest contains a txn write. -func (ba *BatchRequest) IsTransactionWrite() bool { - return ba.hasFlag(isTxnWrite) +// IsLocking returns true iff the BatchRequest intends to acquire locks. +func (ba *BatchRequest) IsLocking() bool { + return ba.hasFlag(isLocking) +} + +// IsIntentWrite returns true iff the BatchRequest contains an intent write. +func (ba *BatchRequest) IsIntentWrite() bool { + return ba.hasFlag(isIntentWrite) } // IsUnsplittable returns true iff the BatchRequest an un-splittable request. @@ -254,7 +260,7 @@ func (ba *BatchRequest) IsCompleteTransaction() bool { return false } if seq == nextSeq { - if !IsTransactionWrite(req) { + if !IsIntentWrite(req) { return false } nextSeq++ @@ -337,15 +343,15 @@ func (br *BatchResponse) String() string { return strings.Join(str, ", ") } -// IntentSpanIterate calls the passed method with the key ranges of the -// transactional writes contained in the batch. Usually the key spans +// LockSpanIterate calls the passed method with the key ranges of the +// transactional locks contained in the batch. Usually the key spans // contained in the requests are used, but when a response contains a -// ResumeSpan the ResumeSpan is subtracted from the request span to provide a -// more minimal span of keys affected by the request. -func (ba *BatchRequest) IntentSpanIterate(br *BatchResponse, fn func(Span)) { +// ResumeSpan the ResumeSpan is subtracted from the request span to +// provide a more minimal span of keys affected by the request. +func (ba *BatchRequest) LockSpanIterate(br *BatchResponse, fn func(Span, lock.Durability)) { for i, arg := range ba.Requests { req := arg.GetInner() - if !IsTransactionWrite(req) { + if !IsLocking(req) { continue } var resp Response @@ -353,7 +359,7 @@ func (ba *BatchRequest) IntentSpanIterate(br *BatchResponse, fn func(Span)) { resp = br.Responses[i].GetInner() } if span, ok := ActualSpan(req, resp); ok { - fn(span) + fn(span, LockingDurability(req)) } } } diff --git a/pkg/roachpb/batch_test.go b/pkg/roachpb/batch_test.go index d05f052bc8b4..be3729284243 100644 --- a/pkg/roachpb/batch_test.go +++ b/pkg/roachpb/batch_test.go @@ -11,9 +11,11 @@ package roachpb import ( + "fmt" "reflect" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/kr/pretty" @@ -215,58 +217,63 @@ func TestBatchRequestSummary(t *testing.T) { } } -func TestIntentSpanIterate(t *testing.T) { - testCases := []struct { +func TestLockSpanIterate(t *testing.T) { + type testReq struct { req Request resp Response span Span resume Span - }{ + } + testReqs := []testReq{ {&ScanRequest{}, &ScanResponse{}, sp("a", "c"), sp("b", "c")}, {&ReverseScanRequest{}, &ReverseScanResponse{}, sp("d", "f"), sp("d", "e")}, - {&DeleteRangeRequest{}, &DeleteRangeResponse{}, sp("g", "i"), sp("h", "i")}, + {&PutRequest{}, &PutResponse{}, sp("m", ""), sp("", "")}, + {&DeleteRangeRequest{}, &DeleteRangeResponse{}, sp("n", "p"), sp("o", "p")}, + // TODO(nvanbenschoten): uncomment in the next commit. + // {&ScanRequest{KeyLocking: lock.Exclusive}, &ScanResponse{}, sp("g", "i"), sp("h", "i")}, + // {&ReverseScanRequest{KeyLocking: lock.Exclusive}, &ReverseScanResponse{}, sp("j", "l"), sp("k", "l")}, } - // A batch request with a batch response with no ResumeSpan. - ba := BatchRequest{} - br := BatchResponse{} - for _, tc := range testCases { - tc.req.SetHeader(RequestHeaderFromSpan(tc.span)) - ba.Add(tc.req) - br.Add(tc.resp) - } + // NB: can't import testutils for RunTrueAndFalse. + for _, resume := range []bool{false, true} { + t.Run(fmt.Sprintf("resume=%t", resume), func(t *testing.T) { + // A batch request with a batch response with no ResumeSpan. + ba := BatchRequest{} + br := BatchResponse{} + for i := range testReqs { + tr := &testReqs[i] + tr.req.SetHeader(RequestHeaderFromSpan(tr.span)) + ba.Add(tr.req) + if resume { + tr.resp.SetHeader(ResponseHeader{ResumeSpan: &tr.resume}) + } + br.Add(tr.resp) + } - var spans []Span - fn := func(span Span) { - spans = append(spans, span) - } - ba.IntentSpanIterate(&br, fn) - // Only DeleteRangeResponse is a write request. - if e := 1; len(spans) != e { - t.Fatalf("unexpected number of spans: e = %d, found = %d", e, len(spans)) - } - if e := testCases[2].span; !reflect.DeepEqual(e, spans[0]) { - t.Fatalf("unexpected spans: e = %+v, found = %+v", e, spans[0]) - } + var spans [lock.MaxDurability + 1][]Span + fn := func(span Span, dur lock.Durability) { + spans[dur] = append(spans[dur], span) + } + ba.LockSpanIterate(&br, fn) - // A batch request with a batch response with a ResumeSpan. - ba = BatchRequest{} - br = BatchResponse{} - for _, tc := range testCases { - tc.req.SetHeader(RequestHeaderFromSpan(tc.span)) - ba.Add(tc.req) - tc.resp.SetHeader(ResponseHeader{ResumeSpan: &tc.resume}) - br.Add(tc.resp) - } + toExpSpans := func(trs ...testReq) []Span { + exp := make([]Span, len(trs)) + for i, tr := range trs { + exp[i] = tr.span + if resume { + exp[i].EndKey = tr.resume.Key + } + } + return exp + } - spans = []Span{} - ba.IntentSpanIterate(&br, fn) - // Only DeleteRangeResponse is a write request. - if e := 1; len(spans) != e { - t.Fatalf("unexpected number of spans: e = %d, found = %d", e, len(spans)) - } - if e := sp("g", "h"); !reflect.DeepEqual(e, spans[0]) { - t.Fatalf("unexpected spans: e = %+v, found = %+v", e, spans[0]) + // The intent writes are replicated locking request. + require.Equal(t, toExpSpans(testReqs[2], testReqs[3]), spans[lock.Replicated]) + + // The scans with KeyLocking are unreplicated locking requests. + require.Nil(t, nil, spans[lock.Unreplicated]) + // require.Equal(t, toExpSpans(testReqs[4], testReqs[5]), spans[lock.Unreplicated]) + }) } }