Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-78 set the active cluster version in the format '<major>.<minor>'
version version 21.2-80 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-78</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-80</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ const (
// ExperimentalMVCCRangeTombstones enables the use of highly experimental MVCC
// range tombstones.
ExperimentalMVCCRangeTombstones
// LooselyCoupledRaftLogTruncation allows the cluster to reduce the coupling
// for raft log truncation, by allowing each replica to treat a truncation
// proposal as an upper bound on what should be truncated.
LooselyCoupledRaftLogTruncation

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -498,6 +502,10 @@ var versionsSingleton = keyedVersions{
Key: ExperimentalMVCCRangeTombstones,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 78},
},
{
Key: LooselyCoupledRaftLogTruncation,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 80},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ go_library(
"queue_helpers_testutil.go",
"raft.go",
"raft_log_queue.go",
"raft_log_truncator.go",
"raft_snapshot_queue.go",
"raft_transport.go",
"raft_truncator_replica.go",
"replica.go",
"replica_application_cmd.go",
"replica_application_cmd_buf.go",
Expand Down Expand Up @@ -164,6 +166,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
Expand Down Expand Up @@ -252,6 +255,7 @@ go_test(
"queue_concurrency_test.go",
"queue_test.go",
"raft_log_queue_test.go",
"raft_log_truncator_test.go",
"raft_test.go",
"raft_transport_test.go",
"raft_transport_unit_test.go",
Expand Down
47 changes: 33 additions & 14 deletions pkg/kv/kvserver/batcheval/cmd_truncate_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -83,13 +84,32 @@ func TruncateLog(
return result.Result{}, errors.Wrap(err, "getting term")
}

// Compute the number of bytes freed by this truncation. Note that this will
// only make sense for the leaseholder as we base this off its own first
// index (other replicas may have other first indexes assuming we're not
// still using the legacy truncated state key). In principle, this could be
// off either way, though in practice we don't expect followers to have
// a first index smaller than the leaseholder's (see #34287), and most of
// the time everyone's first index should be the same.
// Compute the number of bytes freed by this truncation. Note that using
// firstIndex only make sense for the leaseholder as we base this off its
// own first index (other replicas may have other first indexes). In
// principle, this could be off either way, though in practice we don't
// expect followers to have a first index smaller than the leaseholder's
// (see #34287), and most of the time everyone's first index should be the
// same.
// Additionally, it is possible that a write-heavy range has multiple in
// flight TruncateLogRequests, and using the firstIndex will result in
// duplicate accounting. The ExpectedFirstIndex, populated for clusters at
// LooselyCoupledRaftLogTruncation, allows us to avoid this problem.
//
// We have an additional source of error not mitigated by
// ExpectedFirstIndex. There is nothing synchronizing firstIndex with the
// state visible in readWriter. The former uses the in-memory state or
// fetches directly from the Engine. The latter uses Engine state from some
// point in time which can fall anywhere in the time interval starting from
// when the readWriter was created up to where we create an MVCCIterator
// below.
// TODO(sumeer): we can eliminate this error as part of addressing
// https://github.com/cockroachdb/cockroach/issues/55461 and
// https://github.com/cockroachdb/cockroach/issues/70974 that discuss taking
// a consistent snapshot of some Replica state and the engine.
if args.ExpectedFirstIndex > firstIndex {
firstIndex = args.ExpectedFirstIndex
}
start := keys.RaftLogKey(rangeID, firstIndex)
end := keys.RaftLogKey(rangeID, args.Index)

Expand All @@ -98,12 +118,8 @@ func TruncateLog(
// downstream of Raft.
//
// Note that any sideloaded payloads that may be removed by this truncation
// don't matter; they're not tracked in the raft log delta.
//
// TODO(tbg): it's difficult to prove that this computation doesn't have
// bugs that let it diverge. It might be easier to compute the stats
// from scratch, stopping when 4mb (defaultRaftLogTruncationThreshold)
// is reached as at that point we'll truncate aggressively anyway.
// are not tracked in the raft log delta. The delta will be adjusted below
// raft.
iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{UpperBound: end})
defer iter.Close()
// We can pass zero as nowNanos because we're only interested in SysBytes.
Expand All @@ -122,7 +138,10 @@ func TruncateLog(
pd.Replicated.State = &kvserverpb.ReplicaState{
TruncatedState: tState,
}

pd.Replicated.RaftLogDelta = ms.SysBytes
if cArgs.EvalCtx.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).IsActive(
clusterversion.LooselyCoupledRaftLogTruncation) {
pd.Replicated.RaftExpectedFirstIndex = firstIndex
}
return pd, nil
}
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -62,10 +63,12 @@ func TestTruncateLog(t *testing.T) {
firstIndex = 100
)

st := cluster.MakeTestingClusterSettings()
evalCtx := &MockEvalCtx{
Desc: &roachpb.RangeDescriptor{RangeID: rangeID},
Term: term,
FirstIndex: firstIndex,
ClusterSettings: st,
Desc: &roachpb.RangeDescriptor{RangeID: rangeID},
Term: term,
FirstIndex: firstIndex,
}

eng := storage.NewDefaultInMemForTesting()
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ func (p *Result) MergeAndDestroy(q Result) error {

if p.Replicated.State.TruncatedState == nil {
p.Replicated.State.TruncatedState = q.Replicated.State.TruncatedState
p.Replicated.RaftExpectedFirstIndex = q.Replicated.RaftExpectedFirstIndex
} else if q.Replicated.State.TruncatedState != nil {
return errors.AssertionFailedf("conflicting TruncatedState")
}
q.Replicated.State.TruncatedState = nil
q.Replicated.RaftExpectedFirstIndex = 0

if q.Replicated.State.GCThreshold != nil {
if p.Replicated.State.GCThreshold == nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,25 @@ message ReplicatedEvalResult {
storage.enginepb.MVCCStats deprecated_delta = 10; // See #18828
storage.enginepb.MVCCStatsDelta delta = 18 [(gogoproto.nullable) = false];
ChangeReplicas change_replicas = 12;

// RaftLogDelta is the delta in bytes caused by truncation of the raft log.
// It is only populated when evaluating a TruncateLogRequest. The inclusive
// index for the truncation is specified in State.TruncatedState. This delta
// is computed under the assumption that the truncation is happening over
// the interval [RaftExpectedFirstIndex, index]. If the actual truncation at
// a replica is over some interval [x, interval] where x !=
// RaftExpectedFirstIndex it is that replica's job to recalculate this delta
// in order to be accurate, or to make note of the fact that its raft log
// size stats may now be inaccurate.
//
// NB: this delta does not include the byte size of sideloaded entries.
// Sideloaded entries are not expected to be common enough that it is worth
// the optimization to calculate the delta once (at the leaseholder).
int64 raft_log_delta = 13;
// RaftExpectedFirstIndex is populated starting at cluster version
// LooselyCoupledRaftLogTruncation. When this is not populated, the replica
// should not delay enacting the truncation.
uint64 raft_expected_first_index = 25;

// MVCCHistoryMutation describes mutations of MVCC history that may violate
// the closed timestamp, timestamp cache, and guarantees that rely on these
Expand Down
Loading