Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kvserver: fix write below closedts bug #63861

Merged
merged 6 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ func LeaseInfo(
// If there's a lease request in progress, speculatively return that future
// lease.
reply.Lease = nextLease
reply.CurrentLease = &lease
} else {
reply.Lease = lease
}
reply.EvaluatedBy = cArgs.EvalCtx.StoreID()
return result.Result{}, nil
}
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,10 +1424,11 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) {
// Test that a lease extension (a RequestLeaseRequest that doesn't change the
// lease holder) is not blocked by ongoing reads. The test relies on the fact
// that RequestLeaseRequest does not declare to touch the whole key span of the
// range, and thus don't conflict through the command queue with other reads.
// range, and thus don't conflict through latches with other reads.
func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
readBlocked := make(chan struct{})
cmdFilter := func(fArgs kvserverbase.FilterArgs) *roachpb.Error {
if fArgs.Hdr.UserPriority == 42 {
Expand All @@ -1449,7 +1450,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
},
})
s := srv.(*server.TestServer)
defer s.Stopper().Stop(context.Background())
defer s.Stopper().Stop(ctx)

store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
if err != nil {
Expand All @@ -1465,7 +1466,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
Key: key,
},
}
if _, pErr := kv.SendWrappedWith(context.Background(), s.DB().NonTransactionalSender(),
if _, pErr := kv.SendWrappedWith(ctx, s.DB().NonTransactionalSender(),
roachpb.Header{UserPriority: 42},
&getReq); pErr != nil {
errChan <- pErr.GoError()
Expand Down Expand Up @@ -1502,21 +1503,21 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
}

for {
curLease, _, err := s.GetRangeLease(context.Background(), key)
leaseInfo, _, err := s.GetRangeLease(ctx, key, server.AllowQueryToBeForwardedToDifferentNode)
if err != nil {
t.Fatal(err)
}
leaseReq.PrevLease = curLease
leaseReq.PrevLease = leaseInfo.CurrentOrProspective()

_, pErr := kv.SendWrapped(context.Background(), s.DB().NonTransactionalSender(), &leaseReq)
_, pErr := kv.SendWrapped(ctx, s.DB().NonTransactionalSender(), &leaseReq)
if _, ok := pErr.GetDetail().(*roachpb.AmbiguousResultError); ok {
log.Infof(context.Background(), "retrying lease after %s", pErr)
log.Infof(ctx, "retrying lease after %s", pErr)
continue
}
if _, ok := pErr.GetDetail().(*roachpb.LeaseRejectedError); ok {
// Lease rejected? Try again. The extension should work because
// extending is idempotent (assuming the PrevLease matches).
log.Infof(context.Background(), "retrying lease after %s", pErr)
log.Infof(ctx, "retrying lease after %s", pErr)
continue
}
if pErr != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"go.etcd.io/etcd/raft/v3"
)

Expand Down Expand Up @@ -572,3 +573,11 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

// AcquireLease is redirectOnOrAcquireLease exposed for tests.
func (r *Replica) AcquireLease(ctx context.Context) (kvserverpb.LeaseStatus, error) {
ctx = r.AnnotateCtx(ctx)
ctx = logtags.AddTag(ctx, "lease-acq", nil)
l, pErr := r.redirectOnOrAcquireLease(ctx)
return l, pErr.GoError()
}
243 changes: 240 additions & 3 deletions pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"sync"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -33,9 +34,9 @@ import (
)

// TestBumpSideTransportClosed tests the various states that a replica can find
// itself in when its TestBumpSideTransportClosed is called. It verifies that
// the method only returns successfully if it can bump its closed timestamp to
// the target.
// itself in when its BumpSideTransportClosed is called. It verifies that the
// method only returns successfully if it can bump its closed timestamp to the
// target.
func TestBumpSideTransportClosed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -413,6 +414,242 @@ func TestBumpSideTransportClosed(t *testing.T) {
}
}

// Test that a lease proposal that gets rejected doesn't erroneously dictate the
// closed timestamp of further requests. If it would, then writes could violate
// that closed timestamp. The tricky scenario tested is the following:
//
// 1. A lease held by rep1 is getting close to its expiration.
// 2. Rep1 begins the process of transferring its lease to rep2 with a start
// time of 100.
// 3. The transfer goes slowly. From the perspective of rep2, the original lease
// expires, so it begins acquiring a new lease with a start time of 200. The
// lease acquisition is slow to propose.
// 4. The lease transfer finally applies. Rep2 is the new leaseholder and bumps
// its tscache to 100.
// 5. Two writes start evaluating on rep2 under the new lease. They bump their
// write timestamp to 100,1.
// 6. Rep2's lease acquisition from step 3 is proposed. Here's where the
// regression that this test is protecting against comes in: if rep2 was to
// mechanically bump its assignedClosedTimestamp to 200, that'd be incorrect
// because there are in-flight writes at 100. If those writes get proposed
// after the lease acquisition request, the second of them to get proposed
// would violate the closed time carried by the first (see below).
// 7. The lease acquisition gets rejected below Raft because the previous lease
// it asserts doesn't correspond to the lease that it applies under.
// 8. The two writes from step 5 are proposed. The closed timestamp that they
// each carry has a lower bound of rep2.assignedClosedTimestmap. If this was
// 200, then the second one would violate the closed timestamp carried by the
// first one - the first one says that 200 is closed, but then the second
// tries to write at 100. Note that the first write is OK writing at 100 even
// though it carries a closed timestamp of 200 - the closed timestamp carried
// by a command only binds future commands.
//
// The test simulates the scenario and verifies that we don't crash with a
// closed timestamp violation assertion. We avoid the violation because, in step
// 6, the lease proposal doesn't bump the assignedClosedTimestamp.
func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// We're going to orchestrate the scenario by controlling the timing of the
// lease transfer, the lease acquisition and the writes. Note that we'll block
// the lease acquisition and the writes after they evaluate but before they
// get proposed, but we'll block the lease transfer when it's just about to be
// proposed, after it gets assigned the closed timestamp that it will carry.
// We want it to carry a relatively low closed timestamp, so we want its
// closed timestamp to be assigned before we bump the clock to expire the
// original lease.

// leaseTransferCh is used to block the lease transfer.
leaseTransferCh := make(chan struct{})
// leaseAcqCh is used to block the lease acquisition.
leaseAcqCh := make(chan struct{})
// writeCh is used to wait for the two writes to block.
writeCh := make(chan struct{})
// unblockWritesCh is used to unblock the two writes.
unblockWritesCh := make(chan struct{})
var writeKey1, writeKey2 atomic.Value
// Initialize the atomics so they get bound to a specific type.
writeKey1.Store(roachpb.Key{})
writeKey2.Store(roachpb.Key{})
var blockedRangeID int64
var trappedLeaseAcquisition int64

blockLeaseAcquisition := func(args kvserverbase.FilterArgs) {
blockedRID := roachpb.RangeID(atomic.LoadInt64(&blockedRangeID))
leaseReq, ok := args.Req.(*roachpb.RequestLeaseRequest)
if !ok || args.Hdr.RangeID != blockedRID || leaseReq.Lease.Replica.NodeID != 2 {
return
}
if atomic.CompareAndSwapInt64(&trappedLeaseAcquisition, 0, 1) {
leaseAcqCh <- struct{}{}
<-leaseAcqCh
}
}

blockWrites := func(args kvserverbase.FilterArgs) {
wk1 := writeKey1.Load().(roachpb.Key)
wk2 := writeKey2.Load().(roachpb.Key)
if put, ok := args.Req.(*roachpb.PutRequest); ok && (put.Key.Equal(wk1) || put.Key.Equal(wk2)) {
writeCh <- struct{}{}
<-unblockWritesCh
}
}

blockTransfer := func(p *kvserver.ProposalData) {
blockedRID := roachpb.RangeID(atomic.LoadInt64(&blockedRangeID))
ba := p.Request
if ba.RangeID != blockedRID {
return
}
_, ok := p.Request.GetArg(roachpb.TransferLease)
if !ok {
return
}
leaseTransferCh <- struct{}{}
<-leaseTransferCh
}

manual := hlc.NewHybridManualClock()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
DisableConsistencyQueue: true,
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingPostEvalFilter: func(args kvserverbase.FilterArgs) *roachpb.Error {
blockWrites(args)
blockLeaseAcquisition(args)
return nil
},
},
TestingProposalSubmitFilter: func(p *kvserver.ProposalData) (drop bool, _ error) {
blockTransfer(p)
return false, nil
},
},
},
}})
defer tc.Stopper().Stop(ctx)

manual.Pause()
// Upreplicate a range.
n1, n2 := tc.Servers[0], tc.Servers[1]
// One of the filters hardcodes a node id.
require.Equal(t, roachpb.NodeID(2), n2.NodeID())
key := tc.ScratchRangeWithExpirationLease(t)
s1 := tc.GetFirstStoreFromServer(t, 0)
t1, t2 := tc.Target(0), tc.Target(1)
repl0 := s1.LookupReplica(keys.MustAddr(key))
desc := *repl0.Desc()
require.NotNil(t, repl0)
tc.AddVotersOrFatal(t, key, t2)
require.NoError(t, tc.WaitForVoters(key, t2))
// Make sure the lease starts off on n1.
lease, _ /* now */, err := tc.FindRangeLease(desc, &t1 /* hint */)
require.NoError(t, err)
require.Equal(t, n1.NodeID(), lease.Replica.NodeID)

// Advance the time a bit. We'll then initiate a transfer, and we want the
// transferred lease to be valid for a while after the original lease expires.
remainingNanos := lease.GetExpiration().WallTime - manual.UnixNano()
// NOTE: We don't advance the clock past the mid-point of the lease, otherwise
// it gets extended.
pause1 := remainingNanos / 3
manual.Increment(pause1)

// Start a lease transfer from n1 to n2. We'll block the proposal of the transfer for a while.
atomic.StoreInt64(&blockedRangeID, int64(desc.RangeID))
transferErrCh := make(chan error)
go func() {
transferErrCh <- tc.TransferRangeLease(desc, t2)
}()
defer func() {
require.NoError(t, <-transferErrCh)
}()
// Wait for the lease transfer to evaluate and then block.
<-leaseTransferCh
// With the lease transfer still blocked, we now advance the clock beyond the
// original lease's expiration and we make n2 try to acquire a lease. This
// lease acquisition request will also be blocked.
manual.Increment(remainingNanos - pause1 + 1)
leaseAcqErrCh := make(chan error)
go func() {
r, _, err := n2.Stores().GetReplicaForRangeID(ctx, desc.RangeID)
if err != nil {
leaseAcqErrCh <- err
return
}
_, err = r.AcquireLease(ctx)
leaseAcqErrCh <- err
}()
// Wait for the lease acquisition to be blocked.
select {
case <-leaseAcqCh:
case err := <-leaseAcqErrCh:
t.Fatalf("lease request unexpectedly finished. err: %v", err)
}
// Let the previously blocked transfer succeed. n2's lease acquisition remains
// blocked.
close(leaseTransferCh)
// Wait until n2 has applied the lease transfer.
desc = *repl0.Desc()
testutils.SucceedsSoon(t, func() error {
li, _ /* now */, err := tc.FindRangeLeaseEx(ctx, desc, &t2 /* hint */)
if err != nil {
return err
}
lease = li.Current()
if !lease.OwnedBy(n2.GetFirstStoreID()) {
return errors.Errorf("n2 still unaware of its lease: %s", li.Current())
}
return nil
})

// Now we send two writes. We'll block them after evaluation. Then we'll
// unblock the lease acquisition, let the respective command fail to apply,
// and then we'll unblock the writes.
err1 := make(chan error)
err2 := make(chan error)
go func() {
writeKey1.Store(key)
sender := n2.DB().NonTransactionalSender()
pArgs := putArgs(key, []byte("test val"))
_, pErr := kv.SendWrappedWith(ctx, sender, roachpb.Header{Timestamp: lease.Start.ToTimestamp()}, pArgs)
err1 <- pErr.GoError()
}()
go func() {
k := key.Next()
writeKey2.Store(k)
sender := n2.DB().NonTransactionalSender()
pArgs := putArgs(k, []byte("test val2"))
_, pErr := kv.SendWrappedWith(ctx, sender, roachpb.Header{Timestamp: lease.Start.ToTimestamp()}, pArgs)
err2 <- pErr.GoError()
}()
// Wait for the writes to evaluate and block before proposal.
<-writeCh
<-writeCh

// Unblock the lease acquisition.
close(leaseAcqCh)
if err := <-leaseAcqErrCh; err != nil {
close(unblockWritesCh)
t.Fatal(err)
}

// Now unblock the writes.
close(unblockWritesCh)
require.NoError(t, <-err1)
require.NoError(t, <-err2)
// Not crashing with a closed timestamp violation assertion marks the success
// of this test.
}

// BenchmarkBumpSideTransportClosed measures the latency of a single call to
// (*Replica).BumpSideTransportClosed. The closed timestamp side-transport was
// designed with a performance expectation of this check taking no more than
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func newUnloadedReplica(
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter

if leaseHistoryMaxEntries > 0 {
r.leaseHistory = newLeaseHistory()
Expand Down
Loading