Skip to content

Commit

Permalink
kv: ship timestamp cache summary during lease transfers and range merges
Browse files Browse the repository at this point in the history
Fixes #57688.
Fixes #59679.
Fixes #60520.

This commit introduces new logic to ship summaries of a leaseholders
timestamp cache through lease transfers and range merges. For lease
transfers, the read summary is sent from the outgoing leaseholder to the
incoming leaseholder. For range merges, the read summary is sent from
the right-hand side leaseholder (through the SubsumeResponse), to the
left-hand side leaseholder (through the MergeTrigger).

The read summaries perform the role of the lease start time and merge
freeze time used to play for lease transfers and range merges,
respectively - the summaries instruct the post-operation leaseholder on
how to update its timestamp cache to ensure that no future writes are
allowed to invalidate prior reads.

Read summaries have two distinct advantages over the old approach:
1. they can transfer a higher-resolution snapshot of the reads on the
    range through a lease transfer, to make the lease transfers less
    disruptive to writes because the timestamp cache won't be bumped as
    high. This avoids transaction aborts and retries after lease
    transfers and merges.
2. they can transfer information about reads with synthetic timestamps,
    which are not otherwise captured by the new lease's start time.
    Because of this, they are needed for correctness on `global_read`
    ranges, which can serve reads in the future.

This commit does not realize the first benefit, because it uses very
low-resolution read summaries. However, it sets up the infrastructure
that will allow us to realize the benefit in the future by capturing and
shipping higher-resolution read summaries. The commit does realize the
second benefit, as it fixes correctness issues around future time reads.

----

The commit also fixes a related bug that was revealed during the
development of this patch. As explained in #60520, it was possible for a
range merge to be applied to the leaseholder of the LHS of the merge
through a Raft snapshot. In such cases, we were not properly updating
the leaseholder's timestamp cache to reflect the reads served on the RHS
range. This could allow the post-merged range to invalidate reads served
by the pre-merge RHS range.

This commit fixes this bug using the new read summary infrastructure.
Merge triggers now write to the left-hand side's prior read summary with
a read summary gathered from the right-hand side during subsumption.
Later, upon ingesting a Raft snapshot, we check if we subsumed any
replicas and if we are the leaseholder. If both of those conditions are
true, we forward the replica's timestamp cache to the read summary on
the range. Since this read summary must have been updated by the merge
trigger, it will include all reads served on the pre-merge RHS range.

----

Release note (bug fix): Fixes a very rare, possible impossible in
practice, bug where a range merge that applied through a Raft snapshot
on the left-hand side range's leaseholder could allow that leaseholder
to serve writes that invalidated reads from before the merge on the
right-hand side.

Release justification: bug fix
  • Loading branch information
nvanbenschoten committed Feb 24, 2021
1 parent 52ab049 commit a7472e3
Show file tree
Hide file tree
Showing 52 changed files with 3,411 additions and 1,567 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set
version version 20.2-42 set the active cluster version in the format '<major>.<minor>'
version version 20.2-44 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-42</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-44</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
9 changes: 8 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ const (
// ForeignKeyRepresentationMigration is used to ensure that all no table
// descriptors use the pre-19.2 foreign key migration.
ForeignKeyRepresentationMigration
// PriorReadSummaries introduces support for the use of read summary objects
// to ship information about reads on a range through lease changes and
// range merges.
PriorReadSummaries

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -452,11 +456,14 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: NamespaceTableWithSchemasMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 40},
},

{
Key: ForeignKeyRepresentationMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 42},
},
{
Key: PriorReadSummaries,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 44},
},
// Step (2): Add new versions here.
})

Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ var (
// LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease
// index.
LocalLeaseAppliedIndexLegacySuffix = []byte("rlla")
// LocalRangePriorReadSummarySuffix is the suffix for a range's prior read
// summary.
LocalRangePriorReadSummarySuffix = []byte("rprs")
// LocalRangeVersionSuffix is the suffix for the range version.
LocalRangeVersionSuffix = []byte("rver")
// LocalRangeStatsLegacySuffix is the suffix for range statistics.
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ var _ = [...]interface{}{
RaftTruncatedStateLegacyKey, // "rftt"
RangeLeaseKey, // "rll-"
LeaseAppliedIndexLegacyKey, // "rlla"
RangePriorReadSummaryKey, // "rprs"
RangeVersionKey, // "rver"
RangeStatsLegacyKey, // "stat"

Expand Down
12 changes: 12 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ func RangeLeaseKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLeaseKey()
}

// RangePriorReadSummaryKey returns a system-local key for a range's prior read
// summary.
func RangePriorReadSummaryKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangePriorReadSummaryKey()
}

// RangeStatsLegacyKey returns the key for accessing the MVCCStats struct for
// the specified Range ID. The key is no longer written to. Its responsibility
// has been subsumed by the RangeAppliedStateKey.
Expand Down Expand Up @@ -952,6 +958,12 @@ func (b RangeIDPrefixBuf) RangeLeaseKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeLeaseSuffix...)
}

// RangePriorReadSummaryKey returns a system-local key for a range's prior read
// summary.
func (b RangeIDPrefixBuf) RangePriorReadSummaryKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangePriorReadSummarySuffix...)
}

// RangeStatsLegacyKey returns the key for accessing the MVCCStats struct
// for the specified Range ID.
// See comment on RangeStatsLegacyKey function.
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ var (
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix},
{name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix},
{name: "RangeLease", suffix: LocalRangeLeaseSuffix},
{name: "RangePriorReadSummary", suffix: LocalRangePriorReadSummarySuffix},
{name: "RangeStats", suffix: LocalRangeStatsLegacySuffix},
{name: "RangeLastGC", suffix: LocalRangeLastGCSuffix},
{name: "RangeVersion", suffix: LocalRangeVersionSuffix},
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState", revertSupportUnknown},
{keys.RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState", revertSupportUnknown},
{keys.RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease", revertSupportUnknown},
{keys.RangePriorReadSummaryKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangePriorReadSummary", revertSupportUnknown},
{keys.RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats", revertSupportUnknown},
{keys.RangeLastGCKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLastGC", revertSupportUnknown},
{keys.RangeVersionKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeVersion", revertSupportUnknown},
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ go_library(
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/split",
"//pkg/kv/kvserver/stateloader",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ go_library(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/stateloader",
"//pkg/kv/kvserver/txnwait",
Expand Down Expand Up @@ -106,11 +108,14 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/readsummary",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
Expand Down
40 changes: 40 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -159,6 +160,13 @@ func declareKeysEndTxn(
Key: keys.MakeRangeIDReplicatedPrefix(mt.RightDesc.RangeID),
EndKey: keys.MakeRangeIDReplicatedPrefix(mt.RightDesc.RangeID).PrefixEnd(),
})
// Merges incorporate the prior read summary from the RHS into
// the LHS, which ensures that the current and all future
// leaseholders on the joint range respect reads served on the
// RHS.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.RangePriorReadSummaryKey(mt.LeftDesc.RangeID),
})
}
}
}
Expand Down Expand Up @@ -1103,6 +1111,38 @@ func mergeTrigger(
return result.Result{}, err
}

// If we collected a read summary from the right-hand side when freezing it,
// merge that summary into the left-hand side's prior read summary. In the
// usual case, the RightReadSummary in the MergeTrigger will be used to
// update the left-hand side's leaseholder's timestamp cache when applying
// the merge trigger's Raft log entry. However, if the left-hand side's
// leaseholder hears about the merge through a Raft snapshot, the merge
// trigger will not be available, so it will need to use the range's prior
// read summary to update its timestamp cache to ensure that it does not
// serve any writes that invalidate previous reads served on the right-hand
// side range. See TestStoreRangeMergeTimestampCache for an example of where
// this behavior is necessary.
//
// This communication from the RHS to the LHS is handled differently from
// how we copy over the abortspan. In this case, the read summary is passed
// through the SubsumeResponse and into the MergeTrigger. In the abortspan's
// case, we read from local RHS replica (which may not be the leaseholder)
// directly in this method. The primary reason why these are different is
// because the RHS's persistent read summary may not be up-to-date, as it is
// not updated by the SubsumeRequest.
readSumActive := rec.ClusterSettings().Version.IsActive(ctx, clusterversion.PriorReadSummaries)
if merge.RightReadSummary != nil && readSumActive {
mergedSum := merge.RightReadSummary.Clone()
if priorSum, err := readsummary.Load(ctx, batch, rec.GetRangeID()); err != nil {
return result.Result{}, err
} else if priorSum != nil {
mergedSum.Merge(*priorSum)
}
if err := readsummary.Set(ctx, batch, rec.GetRangeID(), ms, mergedSum); err != nil {
return result.Result{}, err
}
}

// The stats for the merged range are the sum of the LHS and RHS stats, less
// the RHS's replicated range ID stats. The only replicated range ID keys we
// copy from the RHS are the keys in the abort span, and we've already
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -52,6 +55,7 @@ func evalNewLease(
ms *enginepb.MVCCStats,
lease roachpb.Lease,
prevLease roachpb.Lease,
priorReadSum *rspb.ReadSummary,
isExtension bool,
isTransfer bool,
) (result.Result, error) {
Expand Down Expand Up @@ -125,6 +129,21 @@ func evalNewLease(
}
pd.Replicated.PrevLeaseProposal = prevLease.ProposedTS

// If we're setting a new prior read summary, store it to disk & in-memory.
// We elide this step in mixed-version clusters as old nodes would ignore
// the PriorReadSummary field (they don't know about it). It's possible that
// in this particular case we could get away with it (as the in-mem field
// only ever updates in-mem state) but it's easy to get things wrong (in
// which case they could easily take a catastrophic turn) and the benefit is
// low.
readSumActive := rec.ClusterSettings().Version.IsActive(ctx, clusterversion.PriorReadSummaries)
if priorReadSum != nil && readSumActive {
if err := readsummary.Set(ctx, readWriter, rec.GetRangeID(), ms, priorReadSum); err != nil {
return newFailedLeaseTrigger(isTransfer), err
}
pd.Replicated.PriorReadSummary = priorReadSum
}

pd.Local.Metrics = new(result.Metrics)
if isTransfer {
pd.Local.Metrics.LeaseTransferSuccess = 1
Expand Down
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -33,6 +34,7 @@ func declareKeysRequestLease(
// (see concurrency.shouldAcquireLatches). However, we continue to
// declare the keys in order to appease SpanSet assertions under race.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(rs.GetRangeID())})
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangePriorReadSummaryKey(rs.GetRangeID())})
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
}

Expand Down Expand Up @@ -134,6 +136,22 @@ func RequestLease(
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}
newLease.Start = effectiveStart

var priorReadSum *rspb.ReadSummary
if !prevLease.Equivalent(newLease) {
// If the new lease is not equivalent to the old lease (i.e. either the
// lease is changing hands or the leaseholder restarted), construct a
// read summary to instruct the new leaseholder on how to update its
// timestamp cache. Since we are not the leaseholder ourselves, we must
// pessimistically assume that prior leaseholders served reads all the
// way up to the start of the new lease.
//
// NB: this is equivalent to the leaseChangingHands condition in
// leasePostApplyLocked.
worstCaseSum := rspb.FromTimestamp(newLease.Start.ToTimestamp())
priorReadSum = &worstCaseSum
}

return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats,
newLease, prevLease, isExtension, false /* isTransfer */)
newLease, prevLease, priorReadSum, isExtension, false /* isTransfer */)
}
Loading

0 comments on commit a7472e3

Please sign in to comment.