Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: introduce a Raft-based transport for closedts #59566

Merged
merged 2 commits into from
Feb 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ const (
// database, such as adding REGIONS to a DATABASE or setting the LOCALITY
// on a TABLE.
MultiRegionFeatures
// ClosedTimestampsRaftTransport enables the Raft transport for closed
// timestamps and disables the previous per-node transport.
ClosedTimestampsRaftTransport

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -426,6 +429,10 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: MultiRegionFeatures,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 34},
},
{
Key: ClosedTimestampsRaftTransport,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 36},
},
// Step (2): Add new versions here.
})

Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ go_library(
"//pkg/kv/kvserver/closedts/container",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/storage",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/gc",
Expand Down Expand Up @@ -299,6 +300,7 @@ go_test(
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/constraint",
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ func Subsume(
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()
// FrozenClosedTimestamp might return an empty timestamp if the Raft-based
// closed timestamp transport hasn't been enabled yet. That's OK because, if
// the new transport is not enabled, then ranges with leading closed
// timestamps can't exist yet, and so the closed timestamp must be below the
// FreezeStart. The FreezeStart is used by Store.MergeRange to bump the RHS'
// ts cache if LHS/RHS leases are not collocated. The case when the leases are
// collocated also works out because then the closed timestamp (according to
// the old mechanism) is the same for both ranges being merged.
reply.ClosedTimestamp = cArgs.EvalCtx.FrozenClosedTimestamp(ctx)

return result.Result{
Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()},
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type EvalContext interface {
GetTerm(uint64) (uint64, error)
GetLeaseAppliedIndex() uint64
GetTracker() closedts.TrackerI
FrozenClosedTimestamp(ctx context.Context) hlc.Timestamp

Desc() *roachpb.RangeDescriptor
ContainsKey(key roachpb.Key) bool
Expand Down Expand Up @@ -184,6 +185,9 @@ func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 {
func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) FrozenClosedTimestamp(ctx context.Context) hlc.Timestamp {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor {
return m.MockEvalCtx.Desc
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return enginepb.NewPopulatedRangeAppliedState(r, false)
},
// The populatedSum has changed from 10390885694280604642 to
// 7958815789228166749, as of 21.1, due to the addition of the
// SeparatedIntentCount field in MVCCStats. This field will not actually
// be populated until all nodes are on 21.1, so there isn't a risk of
// divergence.
emptySum: 615555020845646359,
populatedSum: 7958815789228166749,
populatedSum: 3253881774919630461,
},
reflect.TypeOf(&raftpb.HardState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3332,7 +3332,7 @@ func TestProposalOverhead(t *testing.T) {
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
const (
expectedUserOverhead uint32 = 42
expectedUserOverhead uint32 = 45
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/heap_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@ func (h *heapTracker) LowerBound(ctx context.Context) hlc.Timestamp {
}
return h.mu.rs[0].ts
}

// Count is part of the Tracker interface.
func (h *heapTracker) Count() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.rs.Len()
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -169,6 +170,9 @@ func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) {
b := tok.(lockfreeToken).b
// Note that atomic ops are not required here, as we hold the exclusive lock.
b.refcnt--
if b.refcnt < 0 {
log.Fatalf(ctx, "negative bucket refcount: %d", b.refcnt)
}
if b.refcnt == 0 {
// Reset the bucket, so that future Track() calls can create a new one.
b.ts = 0
Expand Down Expand Up @@ -198,6 +202,11 @@ func (t *lockfreeTracker) LowerBound(ctx context.Context) hlc.Timestamp {
}
}

// Count is part of the Tracker interface.
func (t *lockfreeTracker) Count() int {
return int(t.b1.refcnt) + int(t.b2.refcnt)
}

// bucket represent a Tracker bucket: a data structure that coalesces a number
// of timestamps, keeping track only of their count and minimum.
//
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ type Tracker interface {
// make is that, if no synthethic timestamp is inserted into the tracked set
// for a while, eventually the LowerBound value will not be synthetic.
LowerBound(context.Context) hlc.Timestamp

// Count returns the current size of the tracked set.
//
// Count cannot be called concurrently with other methods.
Count() int
}

// RemovalToken represents the result of Track: a token to be later used with
Expand Down
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,36 @@

package kvserverpb

import "math"
import (
"math"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

var maxRaftCommandFooterSize = (&RaftCommandFooter{
MaxLeaseIndex: math.MaxUint64,
}).Size()

var maxClosedTimestampFooterSize = (&ClosedTimestampFooter{
ClosedTimestamp: hlc.Timestamp{
WallTime: math.MaxInt64,
Logical: math.MaxInt32,
Synthetic: true,
},
}).Size()

// MaxRaftCommandFooterSize returns the maximum possible size of an
// encoded RaftCommandFooter proto.
func MaxRaftCommandFooterSize() int {
return maxRaftCommandFooterSize
}

// MaxClosedTimestampFooterSize returns the maximmum possible size of an encoded
// ClosedTimestampFooter.
func MaxClosedTimestampFooterSize() int {
return maxClosedTimestampFooterSize
}

// IsZero returns whether all fields are set to their zero value.
func (r ReplicatedEvalResult) IsZero() bool {
return r == ReplicatedEvalResult{}
Expand Down
Loading