Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: cache RangeClosedTimestampPolicy in RangeCache, keep up to date #59505

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1862,18 +1862,22 @@ func (ds *DistSender) sendToReplicas(
}
}
prevReplica = curReplica
// Communicate to the server the information our cache has about the range.
// If it's stale, the serve will return an update.
// Communicate to the server the information our cache has about the
// range. If it's stale, the serve will return an update.
ba.ClientRangeInfo = roachpb.ClientRangeInfo{
// Note that DescriptorGeneration will be 0 if the cached descriptor is
// "speculative" (see DescSpeculative()). Even if the speculation is
// correct, we want the serve to return an update, at which point the
// cached entry will no longer be "speculative".
// Note that DescriptorGeneration will be 0 if the cached descriptor
// is "speculative" (see DescSpeculative()). Even if the speculation
// is correct, we want the serve to return an update, at which point
// the cached entry will no longer be "speculative".
DescriptorGeneration: routing.Desc().Generation,
// The LeaseSequence will be 0 if the cache doen't have lease info, or has
// a speculative lease. Like above, this asks the server to return an
// update.
// The LeaseSequence will be 0 if the cache doen't have lease info,
// or has a speculative lease. Like above, this asks the server to
// return an update.
LeaseSequence: routing.LeaseSeq(),
// The ClosedTimestampPolicy will be the default if the cache
// doesn't have info. Like above, this asks the server to return an
// update.
ClosedTimestampPolicy: routing.ClosedTimestampPolicy(),
}
br, err = transport.SendNext(ctx, ba)
ds.maybeIncrementErrCounters(br, err)
Expand Down
23 changes: 18 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,19 +942,24 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
case 1:
assert.Equal(t, desc.Generation, ba.ClientRangeInfo.DescriptorGeneration)
assert.Equal(t, lease1.Sequence, ba.ClientRangeInfo.LeaseSequence)
assert.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ba.ClientRangeInfo.ClosedTimestampPolicy)
contacted1 = true
return nil, errors.New("mock RPC error")
case 2:
// The client has cleared the lease in the cache after the failure of the
// first RPC.
assert.Equal(t, desc.Generation, ba.ClientRangeInfo.DescriptorGeneration)
assert.Equal(t, roachpb.LeaseSequence(0), ba.ClientRangeInfo.LeaseSequence)
assert.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ba.ClientRangeInfo.ClosedTimestampPolicy)
contacted2 = true
br := ba.CreateReply()
// Simulate the leaseholder returning updated lease info to the client.
// Simulate the leaseholder returning updated lease info to the
// client. Also simulate a downgrade away from a global reads closed
// ts policy.
br.RangeInfos = append(br.RangeInfos, roachpb.RangeInfo{
Desc: desc,
Lease: lease2,
Desc: desc,
Lease: lease2,
ClosedTimestampPolicy: roachpb.LAG_BY_CLUSTER_SETTING,
})
return br, nil
default:
Expand All @@ -977,8 +982,9 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {

ds := NewDistSender(cfg)
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: lease1,
Desc: desc,
Lease: lease1,
ClosedTimestampPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
})

var ba roachpb.BatchRequest
Expand All @@ -996,7 +1002,9 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
}

rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.Equal(t, desc, *rng.Desc())
require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID)
require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, rng.ClosedTimestampPolicy())
}

// TestRetryOnDescriptorLookupError verifies that the DistSender retries a descriptor
Expand Down Expand Up @@ -1725,6 +1733,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {
Replica: roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2},
Sequence: 1,
},
ClosedTimestampPolicy: roachpb.LEAD_FOR_GLOBAL_READS,
}},
{{
Desc: descSplit1,
Expand Down Expand Up @@ -1782,9 +1791,11 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {
require.Equal(t, &ri.Desc, entry.Desc())
if ri.Lease.Empty() {
require.Nil(t, entry.Leaseholder())
require.Nil(t, entry.Lease())
} else {
require.Equal(t, &ri.Lease, entry.Lease())
}
require.Equal(t, ri.ClosedTimestampPolicy, entry.ClosedTimestampPolicy())
}
})
}
Expand Down Expand Up @@ -3421,6 +3432,8 @@ func TestErrorIndexAlignment(t *testing.T) {

// TestCanSendToFollower tests that the DistSender abides by the result it
// get from CanSendToFollower.
// TODO(nvanbenschoten): update this test once ClosedTimestampPolicy begins
// dictating the decision of CanSendToFollower.
func TestCanSendToFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
72 changes: 56 additions & 16 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,14 @@ type EvictionToken struct {
// Evict().
rdc *RangeCache

// desc and lease represent the information retrieved from the cache. This can
// advance throughout the life of the token, as various methods re-synchronize
// with the cache. However, it it changes, the descriptor only changes to
// other "compatible" descriptors (same range id and key bounds).
desc roachpb.RangeDescriptor
lease roachpb.Lease
// desc, lease, and closedts represent the information retrieved from the
// cache. This can advance throughout the life of the token, as various
// methods re-synchronize with the cache. However, it it changes, the
// descriptor only changes to other "compatible" descriptors (same range id
// and key bounds).
desc roachpb.RangeDescriptor
lease roachpb.Lease
closedts roachpb.RangeClosedTimestampPolicy

// speculativeDesc, if not nil, is the descriptor that should replace desc if
// desc proves to be stale - i.e. speculativeDesc is inserted in the cache
Expand Down Expand Up @@ -250,6 +252,7 @@ func (rc *RangeCache) makeEvictionToken(
rdc: rc,
desc: entry.desc,
lease: entry.lease,
closedts: entry.closedts,
speculativeDesc: speculativeDesc,
}
}
Expand Down Expand Up @@ -309,6 +312,16 @@ func (et EvictionToken) LeaseSeq() roachpb.LeaseSequence {
return et.lease.Sequence
}

// ClosedTimestampPolicy returns the cache's current understanding of the
// range's closed timestamp policy. If no policy is known, the default policy of
// LAG_BY_CLUSTER_SETTING is returned.
func (et EvictionToken) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
if !et.Valid() {
panic("invalid ClosedTimestampPolicy() call on empty EvictionToken")
}
return et.closedts
}

// syncRLocked syncs the token with the cache. If the cache has a newer, but
// compatible, descriptor and lease, the token is updated. If not, the token is
// invalidated. The token is also invalidated if the cache doesn't contain an
Expand Down Expand Up @@ -466,6 +479,8 @@ func (et *EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachp
Desc: *et.speculativeDesc,
// We don't know anything about the new lease.
Lease: roachpb.Lease{},
// The closed timestamp policy likely hasn't changed.
ClosedTimestampPolicy: et.closedts,
})
} else {
log.Eventf(ctx, "evicting cached range descriptor")
Expand Down Expand Up @@ -509,7 +524,7 @@ func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (CacheEntry,
if err != nil {
return CacheEntry{}, err
}
return CacheEntry{tok.desc, tok.lease}, nil
return CacheEntry{tok.desc, tok.lease, tok.closedts}, nil
}

// GetCachedOverlapping returns all the cached entries which overlap a given
Expand Down Expand Up @@ -661,6 +676,8 @@ func (rc *RangeCache) tryLookup(
desc: rs[0],
// We don't have any lease information.
lease: roachpb.Lease{},
// We don't know the closed timestamp policy.
closedts: roachpb.LAG_BY_CLUSTER_SETTING,
}
for i, preR := range preRs {
newEntries[i+1] = &CacheEntry{desc: preR}
Expand All @@ -686,8 +703,9 @@ func (rc *RangeCache) tryLookup(
// TODO(andrei): It'd be better to retry the cache/database lookup in case 3.
if entry == nil {
entry = &CacheEntry{
desc: rs[0],
lease: roachpb.Lease{},
desc: rs[0],
lease: roachpb.Lease{},
closedts: roachpb.LAG_BY_CLUSTER_SETTING,
}
}
if len(rs) == 1 {
Expand Down Expand Up @@ -910,8 +928,9 @@ func (rc *RangeCache) insertLocked(ctx context.Context, rs ...roachpb.RangeInfo)
entries := make([]*CacheEntry, len(rs))
for i, r := range rs {
entries[i] = &CacheEntry{
desc: r.Desc,
lease: r.Lease,
desc: r.Desc,
lease: r.Lease,
closedts: r.ClosedTimestampPolicy,
}
}
return rc.insertLockedInner(ctx, entries)
Expand Down Expand Up @@ -1062,6 +1081,8 @@ type CacheEntry struct {
// range in here). This allows UpdateLease() to use Lease.Sequence to compare
// leases. Moreover, the lease will correspond to one of the replicas in Desc.
lease roachpb.Lease
// closedts indicates the range's closed timestamp policy.
closedts roachpb.RangeClosedTimestampPolicy
}

func (e CacheEntry) String() string {
Expand Down Expand Up @@ -1097,6 +1118,12 @@ func (e *CacheEntry) Lease() *roachpb.Lease {
return &e.lease
}

// ClosedTimestampPolicy returns the cached understanding of the range's closed
// timestamp policy. If no policy is known, LAG_BY_CLUSTER_SETTING is returned.
func (e *CacheEntry) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
return e.closedts
}

// DescSpeculative returns true if the descriptor in the entry is "speculative"
// - i.e. it doesn't correspond to a committed value. Such descriptors have been
// inserted in the cache with Generation=0.
Expand All @@ -1120,7 +1147,7 @@ func (e *CacheEntry) LeaseSpeculative() bool {
// e's and o'd descriptors overlap (and so they can't co-exist in the cache). A
// newer entry overrides an older entry. What entry is newer is decided based
// the descriptor's generation and, for equal generations, by the lease's
// sequence.
// sequence and, for equal lease sequences, by the closed timestamp policy.
//
// In situations where it can't be determined which entry represents newer
// information, e wins - the assumption is that o is already in the cache and we
Expand Down Expand Up @@ -1150,7 +1177,18 @@ func (e *CacheEntry) overrides(o *CacheEntry) bool {
e.Desc(), o.Desc()))
}

return compareEntryLeases(o, e) < 0
if res := compareEntryLeases(o, e); res != 0 {
return res < 0
}

// Equal lease sequences. Let's look at the closed timestamp policy. We
// don't assign sequence numbers to closed timestamp policy changes, so in
// cases where the two policies differ, we can't tell which one is old and
// which is new. So we conservatively say that if the policy changes, we
// will replace e with o. Closed timestamp policy changes are very rare, so
// minor thrashing in the cache is ok, as long as it eventually converges
// and we are left with a correct understanding of the policy.
return o.closedts != e.closedts
}

// compareEntryDescs returns -1, 0 or 1 depending on whether a's descriptor is
Expand Down Expand Up @@ -1265,8 +1303,9 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach
// what to do about it, though.

return true, &CacheEntry{
desc: e.desc,
lease: *l,
desc: e.desc,
lease: *l,
closedts: e.closedts,
}
}

Expand All @@ -1277,7 +1316,8 @@ func (e *CacheEntry) evictLeaseholder(
return false, e
}
return true, &CacheEntry{
desc: e.desc,
desc: e.desc,
closedts: e.closedts,
}
}

Expand Down
Loading