Skip to content

Commit

Permalink
kvserver: introduce a Raft-based transport for closedts
Browse files Browse the repository at this point in the history
This patch introduces a replacement for the existing closed timestamp
mechanism / transport. The new mechanism is gated by a cluster version.

Raft commands now carry increasing closed timestamps generated by the
propBuf by using the recent request Tracker for synchronizing with
in-flight requests (i.e. not closing timestamps below them).
Raft commands get a closed ts field, and the range state gets the field
as well.

The propBuf pays attention to the range's closed timestamp policy for
deciding whether to close lagging or leading timestamps.

Fixes cockroachdb#57395, cockroachdb#57396
Touches cockroachdb#57405

Release note: None
  • Loading branch information
andreimatei committed Feb 18, 2021
1 parent 95c0fe5 commit f19bb99
Show file tree
Hide file tree
Showing 32 changed files with 1,735 additions and 416 deletions.
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ const (
// via their IDs instead of their names, which leads to allowing such
// sequences to be renamed.
SequencesRegclass
// ClosedTimestampsRaftTransport enables the Raft transport for closed
// timestamps and disables the previous per-node transport.
ClosedTimestampsRaftTransport

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -411,6 +414,10 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: SequencesRegclass,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 30},
},
{
Key: ClosedTimestampsRaftTransport,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 32},
},
// Step (2): Add new versions here.
})

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ go_library(
"//pkg/kv/kvserver/closedts/container",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/storage",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/gc",
Expand Down Expand Up @@ -299,6 +300,7 @@ go_test(
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/constraint",
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return enginepb.NewPopulatedRangeAppliedState(r, false)
},
// The populatedSum has changed from 10390885694280604642 to
// 7958815789228166749, as of 21.1, due to the addition of the
// SeparatedIntentCount field in MVCCStats. This field will not actually
// be populated until all nodes are on 21.1, so there isn't a risk of
// divergence.
emptySum: 615555020845646359,
populatedSum: 7958815789228166749,
populatedSum: 3253881774919630461,
},
reflect.TypeOf(&raftpb.HardState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3332,7 +3332,7 @@ func TestProposalOverhead(t *testing.T) {
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
const (
expectedUserOverhead uint32 = 42
expectedUserOverhead uint32 = 45
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/heap_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@ func (h *heapTracker) LowerBound(ctx context.Context) hlc.Timestamp {
}
return h.mu.rs[0].ts
}

// Count is part of the Tracker interface.
func (h *heapTracker) Count() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.rs.Len()
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -169,6 +170,9 @@ func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) {
b := tok.(lockfreeToken).b
// Note that atomic ops are not required here, as we hold the exclusive lock.
b.refcnt--
if b.refcnt < 0 {
log.Fatalf(ctx, "negative bucket refcount: %d", b.refcnt)
}
if b.refcnt == 0 {
// Reset the bucket, so that future Track() calls can create a new one.
b.ts = 0
Expand Down Expand Up @@ -198,6 +202,11 @@ func (t *lockfreeTracker) LowerBound(ctx context.Context) hlc.Timestamp {
}
}

// Count is part of the Tracker interface.
func (t *lockfreeTracker) Count() int {
return int(t.b1.refcnt) + int(t.b2.refcnt)
}

// bucket represent a Tracker bucket: a data structure that coalesces a number
// of timestamps, keeping track only of their count and minimum.
//
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ type Tracker interface {
// make is that, if no synthethic timestamp is inserted into the tracked set
// for a while, eventually the LowerBound value will not be synthetic.
LowerBound(context.Context) hlc.Timestamp

// Count returns the current size of the tracked set.
//
// Count cannot be called concurrently with other methods.
Count() int
}

// RemovalToken represents the result of Track: a token to be later used with
Expand Down
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,36 @@

package kvserverpb

import "math"
import (
"math"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

var maxRaftCommandFooterSize = (&RaftCommandFooter{
MaxLeaseIndex: math.MaxUint64,
}).Size()

var maxClosedTimestampFooterSize = (&ClosedTimestampFooter{
ClosedTimestamp: hlc.Timestamp{
WallTime: math.MaxInt64,
Logical: math.MaxInt32,
Synthetic: true,
},
}).Size()

// MaxRaftCommandFooterSize returns the maximum possible size of an
// encoded RaftCommandFooter proto.
func MaxRaftCommandFooterSize() int {
return maxRaftCommandFooterSize
}

// MaxClosedTimestampFooterSize returns the maximmum possible size of an encoded
// ClosedTimestampFooter.
func MaxClosedTimestampFooterSize() int {
return maxClosedTimestampFooterSize
}

// IsZero returns whether all fields are set to their zero value.
func (r ReplicatedEvalResult) IsZero() bool {
return r == ReplicatedEvalResult{}
Expand Down
Loading

0 comments on commit f19bb99

Please sign in to comment.