Skip to content

Commit

Permalink
Merge #59505
Browse files Browse the repository at this point in the history
59505: kv: cache RangeClosedTimestampPolicy in RangeCache, keep up to date r=nvanbenschoten a=nvanbenschoten

Informs #52745.

This commit introduces the concept of a RangeClosedTimestampPolicy, which represents the policy used by the leaseholder of a range to establish and publish closed timestamps. The policy dictates how far in the past (lag) or in the future (lead) MVCC history is closed off. Currently, there are two RangeClosedTimestampPolicy:
- `LAG_BY_CLUSTER_SETTING`
- `LEAD_FOR_GLOBAL_READS`

After introducing these policies, the commit teaches the RangeCache about this information. In addition to a range's descriptor and lease, the cache will now maintain an up-to-date understanding of each range's closed timestamp policy.

Finally, the commit adds the policy to ClientRangeInfo and RangeInfo, so that the client <-> server RangeInfo protocol will ensure that the kv client is informed of each Range's closed timestamp policy and kept up to date when its cached information has gone stale.

Now that the kv client is aware of which ranges have configured their closed timestamps to serve global reads, it will be able to use this information in `CanSendToFollower` to target follower replicas for non-stale reads in read-only and read-write transactions.

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Feb 6, 2021
2 parents 222cded + cdd48a0 commit b5f8b8c
Show file tree
Hide file tree
Showing 18 changed files with 1,491 additions and 1,154 deletions.
22 changes: 13 additions & 9 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1862,18 +1862,22 @@ func (ds *DistSender) sendToReplicas(
}
}
prevReplica = curReplica
// Communicate to the server the information our cache has about the range.
// If it's stale, the serve will return an update.
// Communicate to the server the information our cache has about the
// range. If it's stale, the serve will return an update.
ba.ClientRangeInfo = roachpb.ClientRangeInfo{
// Note that DescriptorGeneration will be 0 if the cached descriptor is
// "speculative" (see DescSpeculative()). Even if the speculation is
// correct, we want the serve to return an update, at which point the
// cached entry will no longer be "speculative".
// Note that DescriptorGeneration will be 0 if the cached descriptor
// is "speculative" (see DescSpeculative()). Even if the speculation
// is correct, we want the serve to return an update, at which point
// the cached entry will no longer be "speculative".
DescriptorGeneration: routing.Desc().Generation,
// The LeaseSequence will be 0 if the cache doen't have lease info, or has
// a speculative lease. Like above, this asks the server to return an
// update.
// The LeaseSequence will be 0 if the cache doen't have lease info,
// or has a speculative lease. Like above, this asks the server to
// return an update.
LeaseSequence: routing.LeaseSeq(),
// The ClosedTimestampPolicy will be the default if the cache
// doesn't have info. Like above, this asks the server to return an
// update.
ClosedTimestampPolicy: routing.ClosedTimestampPolicy(),
}
br, err = transport.SendNext(ctx, ba)
ds.maybeIncrementErrCounters(br, err)
Expand Down
23 changes: 18 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,19 +942,24 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
case 1:
assert.Equal(t, desc.Generation, ba.ClientRangeInfo.DescriptorGeneration)
assert.Equal(t, lease1.Sequence, ba.ClientRangeInfo.LeaseSequence)
assert.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ba.ClientRangeInfo.ClosedTimestampPolicy)
contacted1 = true
return nil, errors.New("mock RPC error")
case 2:
// The client has cleared the lease in the cache after the failure of the
// first RPC.
assert.Equal(t, desc.Generation, ba.ClientRangeInfo.DescriptorGeneration)
assert.Equal(t, roachpb.LeaseSequence(0), ba.ClientRangeInfo.LeaseSequence)
assert.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ba.ClientRangeInfo.ClosedTimestampPolicy)
contacted2 = true
br := ba.CreateReply()
// Simulate the leaseholder returning updated lease info to the client.
// Simulate the leaseholder returning updated lease info to the
// client. Also simulate a downgrade away from a global reads closed
// ts policy.
br.RangeInfos = append(br.RangeInfos, roachpb.RangeInfo{
Desc: desc,
Lease: lease2,
Desc: desc,
Lease: lease2,
ClosedTimestampPolicy: roachpb.LAG_BY_CLUSTER_SETTING,
})
return br, nil
default:
Expand All @@ -977,8 +982,9 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {

ds := NewDistSender(cfg)
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: lease1,
Desc: desc,
Lease: lease1,
ClosedTimestampPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
})

var ba roachpb.BatchRequest
Expand All @@ -996,7 +1002,9 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
}

rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.Equal(t, desc, *rng.Desc())
require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID)
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, rng.ClosedTimestampPolicy())
}

// TestRetryOnDescriptorLookupError verifies that the DistSender retries a descriptor
Expand Down Expand Up @@ -1725,6 +1733,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {
Replica: roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2},
Sequence: 1,
},
ClosedTimestampPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
}},
{{
Desc: descSplit1,
Expand Down Expand Up @@ -1782,9 +1791,11 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {
require.Equal(t, &ri.Desc, entry.Desc())
if ri.Lease.Empty() {
require.Nil(t, entry.Leaseholder())
require.Nil(t, entry.Lease())
} else {
require.Equal(t, &ri.Lease, entry.Lease())
}
require.Equal(t, ri.ClosedTimestampPolicy, entry.ClosedTimestampPolicy())
}
})
}
Expand Down Expand Up @@ -3421,6 +3432,8 @@ func TestErrorIndexAlignment(t *testing.T) {

// TestCanSendToFollower tests that the DistSender abides by the result it
// get from CanSendToFollower.
// TODO(nvanbenschoten): update this test once ClosedTimestampPolicy begins
// dictating the decision of CanSendToFollower.
func TestCanSendToFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
72 changes: 56 additions & 16 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,14 @@ type EvictionToken struct {
// Evict().
rdc *RangeCache

// desc and lease represent the information retrieved from the cache. This can
// advance throughout the life of the token, as various methods re-synchronize
// with the cache. However, it it changes, the descriptor only changes to
// other "compatible" descriptors (same range id and key bounds).
desc roachpb.RangeDescriptor
lease roachpb.Lease
// desc, lease, and closedts represent the information retrieved from the
// cache. This can advance throughout the life of the token, as various
// methods re-synchronize with the cache. However, it it changes, the
// descriptor only changes to other "compatible" descriptors (same range id
// and key bounds).
desc roachpb.RangeDescriptor
lease roachpb.Lease
closedts roachpb.RangeClosedTimestampPolicy

// speculativeDesc, if not nil, is the descriptor that should replace desc if
// desc proves to be stale - i.e. speculativeDesc is inserted in the cache
Expand Down Expand Up @@ -250,6 +252,7 @@ func (rc *RangeCache) makeEvictionToken(
rdc: rc,
desc: entry.desc,
lease: entry.lease,
closedts: entry.closedts,
speculativeDesc: speculativeDesc,
}
}
Expand Down Expand Up @@ -309,6 +312,16 @@ func (et EvictionToken) LeaseSeq() roachpb.LeaseSequence {
return et.lease.Sequence
}

// ClosedTimestampPolicy returns the cache's current understanding of the
// range's closed timestamp policy. If no policy is known, the default policy of
// LAG_BY_CLUSTER_SETTING is returned.
func (et EvictionToken) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
if !et.Valid() {
panic("invalid ClosedTimestampPolicy() call on empty EvictionToken")
}
return et.closedts
}

// syncRLocked syncs the token with the cache. If the cache has a newer, but
// compatible, descriptor and lease, the token is updated. If not, the token is
// invalidated. The token is also invalidated if the cache doesn't contain an
Expand Down Expand Up @@ -466,6 +479,8 @@ func (et *EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachp
Desc: *et.speculativeDesc,
// We don't know anything about the new lease.
Lease: roachpb.Lease{},
// The closed timestamp policy likely hasn't changed.
ClosedTimestampPolicy: et.closedts,
})
} else {
log.Eventf(ctx, "evicting cached range descriptor")
Expand Down Expand Up @@ -509,7 +524,7 @@ func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (CacheEntry,
if err != nil {
return CacheEntry{}, err
}
return CacheEntry{tok.desc, tok.lease}, nil
return CacheEntry{tok.desc, tok.lease, tok.closedts}, nil
}

// GetCachedOverlapping returns all the cached entries which overlap a given
Expand Down Expand Up @@ -661,6 +676,8 @@ func (rc *RangeCache) tryLookup(
desc: rs[0],
// We don't have any lease information.
lease: roachpb.Lease{},
// We don't know the closed timestamp policy.
closedts: roachpb.LAG_BY_CLUSTER_SETTING,
}
for i, preR := range preRs {
newEntries[i+1] = &CacheEntry{desc: preR}
Expand All @@ -686,8 +703,9 @@ func (rc *RangeCache) tryLookup(
// TODO(andrei): It'd be better to retry the cache/database lookup in case 3.
if entry == nil {
entry = &CacheEntry{
desc: rs[0],
lease: roachpb.Lease{},
desc: rs[0],
lease: roachpb.Lease{},
closedts: roachpb.LAG_BY_CLUSTER_SETTING,
}
}
if len(rs) == 1 {
Expand Down Expand Up @@ -910,8 +928,9 @@ func (rc *RangeCache) insertLocked(ctx context.Context, rs ...roachpb.RangeInfo)
entries := make([]*CacheEntry, len(rs))
for i, r := range rs {
entries[i] = &CacheEntry{
desc: r.Desc,
lease: r.Lease,
desc: r.Desc,
lease: r.Lease,
closedts: r.ClosedTimestampPolicy,
}
}
return rc.insertLockedInner(ctx, entries)
Expand Down Expand Up @@ -1062,6 +1081,8 @@ type CacheEntry struct {
// range in here). This allows UpdateLease() to use Lease.Sequence to compare
// leases. Moreover, the lease will correspond to one of the replicas in Desc.
lease roachpb.Lease
// closedts indicates the range's closed timestamp policy.
closedts roachpb.RangeClosedTimestampPolicy
}

func (e CacheEntry) String() string {
Expand Down Expand Up @@ -1097,6 +1118,12 @@ func (e *CacheEntry) Lease() *roachpb.Lease {
return &e.lease
}

// ClosedTimestampPolicy returns the cached understanding of the range's closed
// timestamp policy. If no policy is known, LAG_BY_CLUSTER_SETTING is returned.
func (e *CacheEntry) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
return e.closedts
}

// DescSpeculative returns true if the descriptor in the entry is "speculative"
// - i.e. it doesn't correspond to a committed value. Such descriptors have been
// inserted in the cache with Generation=0.
Expand All @@ -1120,7 +1147,7 @@ func (e *CacheEntry) LeaseSpeculative() bool {
// e's and o'd descriptors overlap (and so they can't co-exist in the cache). A
// newer entry overrides an older entry. What entry is newer is decided based
// the descriptor's generation and, for equal generations, by the lease's
// sequence.
// sequence and, for equal lease sequences, by the closed timestamp policy.
//
// In situations where it can't be determined which entry represents newer
// information, e wins - the assumption is that o is already in the cache and we
Expand Down Expand Up @@ -1150,7 +1177,18 @@ func (e *CacheEntry) overrides(o *CacheEntry) bool {
e.Desc(), o.Desc()))
}

return compareEntryLeases(o, e) < 0
if res := compareEntryLeases(o, e); res != 0 {
return res < 0
}

// Equal lease sequences. Let's look at the closed timestamp policy. We
// don't assign sequence numbers to closed timestamp policy changes, so in
// cases where the two policies differ, we can't tell which one is old and
// which is new. So we conservatively say that if the policy changes, we
// will replace e with o. Closed timestamp policy changes are very rare, so
// minor thrashing in the cache is ok, as long as it eventually converges
// and we are left with a correct understanding of the policy.
return o.closedts != e.closedts
}

// compareEntryDescs returns -1, 0 or 1 depending on whether a's descriptor is
Expand Down Expand Up @@ -1265,8 +1303,9 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach
// what to do about it, though.

return true, &CacheEntry{
desc: e.desc,
lease: *l,
desc: e.desc,
lease: *l,
closedts: e.closedts,
}
}

Expand All @@ -1277,7 +1316,8 @@ func (e *CacheEntry) evictLeaseholder(
return false, e
}
return true, &CacheEntry{
desc: e.desc,
desc: e.desc,
closedts: e.closedts,
}
}

Expand Down
Loading

0 comments on commit b5f8b8c

Please sign in to comment.