Skip to content

Commit

Permalink
roachpb: redefine read/write request divide, introduce locking concept
Browse files Browse the repository at this point in the history
This commit decouples the key-value layer notion of "writing" with "locking".
It does so by first redefining the difference between reads and writes as
a function of whether the requests go through Raft or not. Reads observe
state on a leaseholder (or follower) but do not create a WriteBatch that is
proposed through Raft. Writes observe state on a leaseholder and do create
a WriteBatch that is proposed through Raft.

With distinction consolidated, the change then moves on to introducing the
notion of "locking" at the key-value layer. Both read and write requests
are able to acquire locks, though at different durability levels. Reads
can only acquire unreplicated locks while writes can acquire replicated
locks.

Finally, the commit renames "TransactionWrite" to "IntentWrite", which is
more clear. The commit then audits situations where `IsIntentWrite` is used
and replaces them with `IsLocking` where appropriate.
  • Loading branch information
nvanbenschoten committed Mar 5, 2020
1 parent f048094 commit 278a21b
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 136 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/followerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Dur
// batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of
// requests that can be evaluated on a follower replica.
func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && ba.IsAllTransactional()
return !ba.IsLocking() && ba.IsAllTransactional()
}

// txnCanPerformFollowerRead determines if the provided transaction can perform
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2972,7 +2972,7 @@ func TestCanSendToFollower(t *testing.T) {
defer func() { CanSendToFollower = old }()
canSend := true
CanSendToFollower = func(_ uuid.UUID, _ *cluster.Settings, ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && canSend
return !ba.IsLocking() && canSend
}

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func DefaultDeclareKeys(
latchSpans, _ *spanset.SpanSet,
) {
var access spanset.SpanAccess
if roachpb.IsReadOnly(req) {
if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) {
access = spanset.SpanReadOnly
} else {
access = spanset.SpanReadWrite
Expand All @@ -47,7 +47,7 @@ func DefaultDeclareIsolatedKeys(
latchSpans, lockSpans *spanset.SpanSet,
) {
var access spanset.SpanAccess
if roachpb.IsReadOnly(req) {
if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) {
access = spanset.SpanReadOnly
} else {
access = spanset.SpanReadWrite
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool {
// We don't try to ack async consensus writes before application because we
// know that there isn't a client waiting for the result.
req := c.proposal.Request
return req.IsTransactionWrite() && !req.AsyncConsensus
return req.IsIntentWrite() && !req.AsyncConsensus
}

// AckSuccess implements the apply.CheckedCommand interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest,
for _, ru := range otherReqs {
req := ru.GetInner()
switch {
case roachpb.IsTransactionWrite(req) && !roachpb.IsRange(req):
case roachpb.IsIntentWrite(req) && !roachpb.IsRange(req):
// Concurrent point write.
writes++
case req.Method() == roachpb.QueryIntent:
Expand Down Expand Up @@ -100,7 +100,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest,
req := ru.GetInner()
seq := req.Header().Sequence
switch {
case roachpb.IsTransactionWrite(req) && !roachpb.IsRange(req):
case roachpb.IsIntentWrite(req) && !roachpb.IsRange(req):
// Concurrent point write.
case req.Method() == roachpb.QueryIntent:
// Earlier pipelined point write that hasn't been proven yet. We
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *Replica) canServeFollowerRead(
canServeFollowerRead := false
if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch &&
ba.IsAllTransactional() && // followerreadsccl.batchCanBeEvaluatedOnFollower
(!ba.IsLocking() && ba.IsAllTransactional()) && // followerreadsccl.batchCanBeEvaluatedOnFollower
(ba.Txn == nil || !ba.Txn.IsLocking()) && // followerreadsccl.txnCanPerformFollowerRead
FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) {

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,15 @@ func (r *Replica) collectSpans(
//
// TODO(bdarnell): revisit as the local portion gets its appropriate
// use.
if ba.IsReadOnly() {
latchSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests))
} else {
if ba.IsLocking() {
guess := len(ba.Requests)
if et, ok := ba.GetArg(roachpb.EndTxn); ok {
// EndTxn declares a global write for each of its intent spans.
guess += len(et.(*roachpb.EndTxnRequest).IntentSpans) - 1
}
latchSpans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess)
} else {
latchSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests))
}

// For non-local, MVCC spans we annotate them with the request timestamp
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (tc *txnCommitter) canCommitInParallel(
for _, ru := range ba.Requests[:len(ba.Requests)-1] {
req := ru.GetInner()
switch {
case roachpb.IsTransactionWrite(req):
case roachpb.IsIntentWrite(req):
if roachpb.IsRange(req) {
// Similar to how we can't pipeline ranged writes, we also can't
// commit in parallel with them. The reason for this is that the
Expand Down
19 changes: 9 additions & 10 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,16 @@ func (h *txnHeartbeater) init(
func (h *txnHeartbeater) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
firstWriteIdx, pErr := firstWriteIndex(&ba)
firstLockingIndex, pErr := firstLockingIndex(&ba)
if pErr != nil {
return nil, pErr
}
haveTxnWrite := firstWriteIdx != -1
if haveTxnWrite {
if firstLockingIndex != -1 {
// Set txn key based on the key of the first transactional write if not
// already set. If it is already set, make sure we keep the anchor key
// the same.
if len(h.mu.txn.Key) == 0 {
anchor := ba.Requests[firstWriteIdx].GetInner().Header().Key
anchor := ba.Requests[firstLockingIndex].GetInner().Header().Key
h.mu.txn.Key = anchor
// Put the anchor also in the ba's copy of the txn, since this batch
// was prepared before we had an anchor.
Expand Down Expand Up @@ -396,19 +395,19 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
}
}

// firstWriteIndex returns the index of the first transactional write in the
// BatchRequest. Returns -1 if the batch has not intention to write. It also
// verifies that if an EndTxnRequest is included, then it is the last request
// in the batch.
func firstWriteIndex(ba *roachpb.BatchRequest) (int, *roachpb.Error) {
// firstLockingIndex returns the index of the first request that acquires locks
// in the BatchRequest. Returns -1 if the batch has no intention to acquire
// locks. It also verifies that if an EndTxnRequest is included, then it is the
// last request in the batch.
func firstLockingIndex(ba *roachpb.BatchRequest) (int, *roachpb.Error) {
for i, ru := range ba.Requests {
args := ru.GetInner()
if i < len(ba.Requests)-1 /* if not last*/ {
if _, ok := args.(*roachpb.EndTxnRequest); ok {
return -1, roachpb.NewErrorf("%s sent as non-terminal call", args.Method())
}
}
if roachpb.IsTransactionWrite(args) {
if roachpb.IsLocking(args) {
return i, nil
}
}
Expand Down
80 changes: 44 additions & 36 deletions pkg/kv/txn_interceptor_heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,50 +138,58 @@ func TestTxnHeartbeaterSetsTransactionKey(t *testing.T) {
require.Equal(t, keyB, roachpb.Key(txn.Key))
}

// TestTxnHeartbeaterLoopStartedOnFirstWrite tests that the txnHeartbeater
// doesn't start its heartbeat loop until it observes the transaction perform
// a write.
func TestTxnHeartbeaterLoopStartedOnFirstWrite(t *testing.T) {
// TestTxnHeartbeaterLoopStartedOnFirstLock tests that the txnHeartbeater
// doesn't start its heartbeat loop until it observes the transaction issues
// a request that will acquire locks.
func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
txn := makeTxnProto()
th, _, _ := makeMockTxnHeartbeater(&txn)
defer th.stopper.Stop(ctx)
testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
ctx := context.Background()
txn := makeTxnProto()
th, _, _ := makeMockTxnHeartbeater(&txn)
defer th.stopper.Stop(ctx)

// Read-only requests don't start the heartbeat loop.
keyA := roachpb.Key("a")
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: txn.Clone()}
ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
// Read-only requests don't start the heartbeat loop.
keyA := roachpb.Key("a")
keyAHeader := roachpb.RequestHeader{Key: keyA}
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: txn.Clone()}
ba.Add(&roachpb.GetRequest{RequestHeader: keyAHeader})

br, pErr := th.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
br, pErr := th.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

th.mu.Lock()
require.False(t, th.mu.loopStarted)
require.False(t, th.heartbeatLoopRunningLocked())
th.mu.Unlock()
th.mu.Lock()
require.False(t, th.mu.loopStarted)
require.False(t, th.heartbeatLoopRunningLocked())
th.mu.Unlock()

// The heartbeat loop is started on the first writing request.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
// The heartbeat loop is started on the first locking request.
ba.Requests = nil
if write {
ba.Add(&roachpb.PutRequest{RequestHeader: keyAHeader})
} else {
t.Skip("TODO(nvanbenschoten): uncomment")
// ba.Add(&roachpb.ScanRequest{RequestHeader: keyAHeader, KeyLocking: lock.Exclusive})
}

br, pErr = th.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
br, pErr = th.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

th.mu.Lock()
require.True(t, th.mu.loopStarted)
require.True(t, th.heartbeatLoopRunningLocked())
th.mu.Unlock()
th.mu.Lock()
require.True(t, th.mu.loopStarted)
require.True(t, th.heartbeatLoopRunningLocked())
th.mu.Unlock()

// Closing the interceptor stops the heartbeat loop.
th.mu.Lock()
th.closeLocked()
th.mu.Unlock()
waitForHeartbeatLoopToStop(t, &th)
require.True(t, th.mu.loopStarted) // still set
// Closing the interceptor stops the heartbeat loop.
th.mu.Lock()
th.closeLocked()
th.mu.Unlock()
waitForHeartbeatLoopToStop(t, &th)
require.True(t, th.mu.loopStarted) // still set
})
}

// TestTxnHeartbeaterLoopNotStartedFor1PC tests that the txnHeartbeater does
Expand Down
29 changes: 19 additions & 10 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -250,7 +251,7 @@ func (tp *txnPipeliner) attachWritesToEndTxn(
for _, ru := range ba.Requests[:len(ba.Requests)-1] {
req := ru.GetInner()
h := req.Header()
if roachpb.IsTransactionWrite(req) {
if roachpb.IsIntentWrite(req) {
// Ranged writes are added immediately to the intent spans because
// it's not clear where they will actually leave intents. Point
// writes are added to the in-flight writes set.
Expand Down Expand Up @@ -330,7 +331,7 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba roachpb.BatchRequest) roachpb.B
// If we're currently planning on performing the batch with
// performing async consensus, determine whether this request
// changes that.
if !roachpb.IsTransactionWrite(req) || roachpb.IsRange(req) {
if !roachpb.IsIntentWrite(req) || roachpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
Expand Down Expand Up @@ -449,23 +450,23 @@ func (tp *txnPipeliner) updateWriteTracking(
if br == nil {
// The transaction cannot continue in this epoch whether this is
// a retryable error or not.
ba.IntentSpanIterate(nil, tp.footprint.insert)
ba.LockSpanIterate(nil, tp.trackLocks)
return
}

// Similarly, if the transaction is now finalized, we don't need to
// accurately update the write tracking.
// accurately update the lock tracking.
if br.Txn.Status.IsFinalized() {
switch br.Txn.Status {
case roachpb.ABORTED:
// If the transaction is now ABORTED, add all intent writes from
// the batch directly to the write footprint. We don't know which
// of these succeeded.
ba.IntentSpanIterate(nil, tp.footprint.insert)
// If the transaction is now ABORTED, add all locks acquired by the
// batch directly to the lock footprint. We don't know which of
// these succeeded.
ba.LockSpanIterate(nil, tp.trackLocks)
case roachpb.COMMITTED:
// If the transaction is now COMMITTED, it must not have any more
// in-flight writes, so clear them. Technically we should move all
// of these to the write footprint, but since the transaction is
// of these to the lock footprint, but since the transaction is
// already committed, there's no reason to.
tp.ifWrites.clear(
/* reuse - we're not going to use this Btree again, so there's no point in
Expand All @@ -492,7 +493,7 @@ func (tp *txnPipeliner) updateWriteTracking(
// Move to write footprint.
tp.footprint.insert(roachpb.Span{Key: qiReq.Key})
}
} else if roachpb.IsTransactionWrite(req) {
} else if roachpb.IsIntentWrite(req) {
// If the request was a transactional write, track its intents.
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
Expand All @@ -510,6 +511,14 @@ func (tp *txnPipeliner) updateWriteTracking(
}
}

func (tp *txnPipeliner) trackLocks(s roachpb.Span, dur lock.Durability) {
// TODO(nvanbenschoten): handle unreplicated locks.
if dur != lock.Replicated {
panic("unexpected lock durability")
}
tp.footprint.insert(s)
}

// stripQueryIntents adjusts the BatchResponse to hide the fact that this
// interceptor added new requests to the batch. It returns an adjusted batch
// response without the responses that correspond to these added requests.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *txnSeqNumAllocator) SendLocked(
// Only increment the sequence number generator for requests that
// will leave intents or requests that will commit the transaction.
// This enables ba.IsCompleteTransaction to work properly.
if roachpb.IsTransactionWrite(req) || req.Method() == roachpb.EndTxn {
if roachpb.IsIntentWrite(req) || req.Method() == roachpb.EndTxn {
s.writeSeq++
}

Expand Down
Loading

0 comments on commit 278a21b

Please sign in to comment.