Skip to content

Commit

Permalink
Merge #56373
Browse files Browse the repository at this point in the history
56373: hlc: introduce synthetic flag on timestamps r=nvanbenschoten a=nvanbenschoten

Informs #52745.
Informs #36431.

This commit introduces an 8-bit `flags` field on the hlc timestamp struct. The flags are used to provide details about the timestamp and its meaning. They do not affect the sort order of Timestamps.

The commit then introduces the first flag: SYNTHETIC. As discussed in #52745, a synthetic timestamp is defined as a timestamp that makes no claim about the value of clocks in the system. While standard timestamps are pulled from HLC clocks and indicate that some node in the system has a clock with a reading equal to or above its value, a synthetic timestamp makes no such indication. By avoiding a connection to "real time", synthetic timestamps can be used to write values at a future time and to indicate that observed timestamps do not apply to such writes for the purposes of tracking causality between the write and its observers. Observed timestamps will be a critical part of implementing non-blocking transactions (#52745) and fixing the interaction between observed timestamps and transaction refreshing (#36431).

The original plan was to reserve the high-order bit in the logical portion of a timestamp as a "synthetic bit". This is how I began implementing things, but was turned off for a few reasons. First, it was fairly subtle and seemed too easy to get wrong. Using a separate field is more explicit and avoids a class of bugs. Second, I began to have serious concerns about how the synthetic bit would impact timestamp ordering. Every timestamp comparison would need to mask out the bit or risk being incorrect. This was even true of the LSM custom comparator. This seemed difficult to get right and seemed particularly concerning since we're planning on marking only some of a transaction's committed values as synthetic to fix #36431, so if we weren't careful, we could get atomicity violations. There were also minor backwards compatibility concerns.

But a separate field is more expensive in theory, so we need to be careful. However, it turns out that a separate field is mostly free in each case that we care about. In memory, the separate field is effectively free because the Timestamp struct was previously 12 bytes but was always padded out to 16 bytes when included as a field in any other struct. This means that the flags field is replacing existing padding. Over the wire, the field will not be included when zero and will use a varint encoding when not zero, so again, it is mostly free. In the engine key encoding, the field is also not included when zero, and takes up only 1 byte when non-zero, so it is mostly free.

----

First three commits from #56477.

@sumeerbhola I'm hoping you can take a look at the engine-level changes in the `introduce synthetic flag on timestamps` commit (4th commit as of the time of writing). I think the key encoding added here makes sense, but want to make sure you're on board. One possible concern is that we introduce a new 13-byte suffix, which means that combined with a 4-byte sequence number (see #41720 (comment)), we'd collide with the 17 byte `engineKeyVersionLockTableLen`.

@tbg do you mind being the primary reviewer here? I think you know the most about the motivations for this change and will have a good sense of whether this is the best way to introduce additional state on timestamps.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Nov 13, 2020
2 parents b4fae9d + db9ef7e commit 1930679
Show file tree
Hide file tree
Showing 89 changed files with 655 additions and 269 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func backupPlanHook(
keys.MinKey,
p.User(),
func(span covering.Range, start, end hlc.Timestamp) error {
if (start == hlc.Timestamp{}) {
if start.IsEmpty() {
newSpans = append(newSpans, roachpb.Span{Key: span.Start, EndKey: span.End})
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times
// we fingerprint at `updated.Prev()` since we want to catch cases where one or
// more row updates are missed. For example: If k1 was written at t1, t2, t3 and
// the update for t2 was missed.
if v.previousRowUpdateTs != (hlc.Timestamp{}) && v.previousRowUpdateTs.Less(row.updated) {
if !v.previousRowUpdateTs.IsEmpty() && v.previousRowUpdateTs.Less(row.updated) {
if err := v.fingerprint(row.updated.Prev()); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func kvsToRows(
}
schemaTimestamp := kv.Value.Timestamp
prevSchemaTimestamp := schemaTimestamp
if backfillTs := input.BackfillTimestamp(); backfillTs != (hlc.Timestamp{}) {
if backfillTs := input.BackfillTimestamp(); !backfillTs.IsEmpty() {
schemaTimestamp = backfillTs
prevSchemaTimestamp = schemaTimestamp.Prev()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func distChangefeedFlow(
// based on whether we should perform an initial scan.
{
h := progress.GetHighWater()
noHighWater := (h == nil || *h == (hlc.Timestamp{}))
noHighWater := (h == nil || h.IsEmpty())
// We want to set the highWater and thus avoid an initial scan if either
// this is a cursor and there was no request for one, or we don't have a
// cursor but we have a request to not have an initial scan.
Expand All @@ -92,7 +92,7 @@ func distChangefeedFlow(

spansTS := details.StatementTime
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && *h != (hlc.Timestamp{}) {
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
// If we have a high-water set, use it to compute the spans, since the
// ones at the statement time may have been garbage collected by now.
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *Event) Timestamp() hlc.Timestamp {
case ResolvedEvent:
return b.resolved.Timestamp
case KVEvent:
if b.backfillTimestamp != (hlc.Timestamp{}) {
if !b.backfillTimestamp.IsEmpty() {
return b.backfillTimestamp
}
return b.kv.Value.Timestamp
Expand Down Expand Up @@ -211,6 +211,7 @@ var memBufferColTypes = []*types.T{
types.Bytes, // span.EndKey
types.Int, // ts.WallTime
types.Int, // ts.Logical
types.Int, // ts.Flags
}

// memBuffer is an in-memory buffer for changed KV and Resolved timestamp
Expand Down Expand Up @@ -266,6 +267,7 @@ func (b *memBuffer) AddKV(
tree.DNull,
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Logical)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -284,6 +286,7 @@ func (b *memBuffer) AddResolved(
b.allocMu.a.NewDBytes(tree.DBytes(span.EndKey)),
b.allocMu.a.NewDInt(tree.DInt(ts.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(ts.Logical)),
b.allocMu.a.NewDInt(tree.DInt(ts.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -300,6 +303,7 @@ func (b *memBuffer) Get(ctx context.Context) (Event, error) {
ts := hlc.Timestamp{
WallTime: int64(*row[5].(*tree.DInt)),
Logical: int32(*row[6].(*tree.DInt)),
Flags: uint32(*row[7].(*tree.DInt)),
}
if row[2] != tree.DNull {
e.prevVal = roachpb.Value{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func copyFromSourceToSinkUntilTableEvent(
return false, false
}
frontier.Forward(resolved.Span, boundaryResolvedTimestamp)
return true, frontier.Frontier() == boundaryResolvedTimestamp
return true, frontier.Frontier().EqOrdering(boundaryResolvedTimestamp)
default:
log.Fatal(ctx, "unknown event type")
return false, false
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (tf *SchemaFeed) waitForTS(ctx context.Context, ts hlc.Timestamp) error {
tf.mu.Lock()
highWater := tf.mu.highWater
var err error
if tf.mu.errTS != (hlc.Timestamp{}) && tf.mu.errTS.LessEq(ts) {
if !tf.mu.errTS.IsEmpty() && tf.mu.errTS.LessEq(ts) {
err = tf.mu.err
}
fastPath := err != nil || ts.LessEq(highWater)
Expand Down Expand Up @@ -437,7 +437,7 @@ func (tf *SchemaFeed) adjustTimestamps(startTS, endTS hlc.Timestamp, validateErr

if validateErr != nil {
// don't care about startTS in the invalid case
if tf.mu.errTS == (hlc.Timestamp{}) || endTS.Less(tf.mu.errTS) {
if tf.mu.errTS.IsEmpty() || endTS.Less(tf.mu.errTS) {
tf.mu.errTS = endTS
tf.mu.err = validateErr
newWaiters := make([]tableHistoryWaiter, 0, len(tf.mu.waiters))
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
}
batch.Close()

sort.Slice(timestamps, func(i, j int) bool {
return (timestamps[i].WallTime < timestamps[j].WallTime) ||
(timestamps[i].WallTime == timestamps[j].WallTime &&
timestamps[i].Logical < timestamps[j].Logical)
})
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i].Less(timestamps[j]) })
return keys, timestamps
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -220,7 +219,7 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
break
}

if args.EndTime != (hlc.Timestamp{}) {
if !args.EndTime.IsEmpty() {
// TODO(dan): If we have to skip past a lot of versions to find the
// latest one before args.EndTime, then this could be slow.
if args.EndTime.Less(iter.UnsafeKey().Timestamp) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func loadRangeDescriptor(
) (roachpb.RangeDescriptor, error) {
var desc roachpb.RangeDescriptor
handleKV := func(kv storage.MVCCKeyValue) error {
if kv.Key.Timestamp == (hlc.Timestamp{}) {
if kv.Key.Timestamp.IsEmpty() {
// We only want values, not MVCCMetadata.
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewFakeNodeLiveness(nodeCount int) *FakeNodeLiveness {
nodeID := roachpb.NodeID(i + 1)
nl.mu.livenessMap[nodeID] = &livenesspb.Liveness{
Epoch: 1,
Expiration: hlc.LegacyTimestamp(hlc.MaxTimestamp),
Expiration: hlc.MaxTimestamp.ToLegacyTimestamp(),
NodeID: nodeID,
}
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (nl *FakeNodeLiveness) FakeIncrementEpoch(id roachpb.NodeID) {
func (nl *FakeNodeLiveness) FakeSetExpiration(id roachpb.NodeID, ts hlc.Timestamp) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.livenessMap[id].Expiration = hlc.LegacyTimestamp(ts)
nl.mu.livenessMap[id].Expiration = ts.ToLegacyTimestamp()
}

// ResetConstructors resets the registered Resumer constructors.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func TestShowJobs(t *testing.T) {
progress := &jobspb.Progress{
ModifiedMicros: in.modified.UnixNano() / time.Microsecond.Nanoseconds(),
}
if in.highWater != (hlc.Timestamp{}) {
if !in.highWater.IsEmpty() {
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &in.highWater,
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ var (
// metadata is identified by one of the suffixes listed below, along
// with potentially additional encoded key info, for instance in the
// case of AbortSpan entry.
//
// NOTE: LocalRangeIDPrefix must be kept in sync with the value
// in storage/engine/rocksdb/db.cc.
LocalRangeIDPrefix = roachpb.RKey(makeKey(localPrefix, roachpb.Key("i")))
// LocalRangeIDReplicatedInfix is the post-Range ID specifier for all Raft
// replicated per-range data. By appending this after the Range ID, these
Expand Down Expand Up @@ -135,9 +132,6 @@ var (
// specific sort of per-range metadata is identified by one of the
// suffixes listed below, along with potentially additional encoded
// key info, such as the txn ID in the case of a transaction record.
//
// NOTE: LocalRangePrefix must be kept in sync with the value in
// storage/engine/rocksdb/db.cc.
LocalRangePrefix = roachpb.Key(makeKey(localPrefix, roachpb.RKey("k")))
LocalRangeMax = LocalRangePrefix.PrefixEnd()
// LocalQueueLastProcessedSuffix is the suffix for replica queue state keys.
Expand All @@ -152,8 +146,6 @@ var (
LocalRangeDescriptorSuffix = roachpb.RKey("rdsc")
// LocalTransactionSuffix specifies the key suffix for
// transaction records. The additional detail is the transaction id.
// NOTE: if this value changes, it must be updated in C++
// (storage/engine/rocksdb/db.cc).
LocalTransactionSuffix = roachpb.RKey("txn-")

// 4. Lock table keys
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestClientGetAndPut(t *testing.T) {
if !bytes.Equal(value, gr.ValueBytes()) {
t.Errorf("expected values equal; %s != %s", value, gr.ValueBytes())
}
if gr.Value.Timestamp == (hlc.Timestamp{}) {
if gr.Value.Timestamp.IsEmpty() {
t.Fatalf("expected non-zero timestamp; got empty")
}
}
Expand All @@ -361,7 +361,7 @@ func TestClientPutInline(t *testing.T) {
if !bytes.Equal(value, gr.ValueBytes()) {
t.Errorf("expected values equal; %s != %s", value, gr.ValueBytes())
}
if ts := gr.Value.Timestamp; ts != (hlc.Timestamp{}) {
if ts := gr.Value.Timestamp; !ts.IsEmpty() {
t.Fatalf("expected zero timestamp; got %s", ts)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (ds *DistSender) initAndVerifyBatch(

// In the event that timestamp isn't set and read consistency isn't
// required, set the timestamp using the local clock.
if ba.ReadConsistency != roachpb.CONSISTENT && ba.Timestamp == (hlc.Timestamp{}) {
if ba.ReadConsistency != roachpb.CONSISTENT && ba.Timestamp.IsEmpty() {
ba.Timestamp = ds.clock.Now()
}

Expand Down Expand Up @@ -1954,10 +1954,10 @@ func (ds *DistSender) sendToReplicas(
// If the reply contains a timestamp, update the local HLC with it.
if br.Error != nil {
log.VErrEventf(ctx, 2, "%v", br.Error)
if br.Error.Now != (hlc.Timestamp{}) {
if !br.Error.Now.IsEmpty() {
ds.clock.Update(br.Error.Now)
}
} else if br.Now != (hlc.Timestamp{}) {
} else if !br.Now.IsEmpty() {
ds.clock.Update(br.Now)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestRangeSplitsStickyBit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if (desc.GetStickyBit() == hlc.Timestamp{}) {
if desc.GetStickyBit().IsEmpty() {
t.Fatal("Sticky bit not set after splitting")
}

Expand All @@ -309,7 +309,7 @@ func TestRangeSplitsStickyBit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if (desc.GetStickyBit() == hlc.Timestamp{}) {
if desc.GetStickyBit().IsEmpty() {
t.Fatal("Sticky bit not set after splitting")
}
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func RunCommitTrigger(
}
if sbt := ct.GetStickyBitTrigger(); sbt != nil {
newDesc := *rec.Desc()
if sbt.StickyBit != (hlc.Timestamp{}) {
if !sbt.StickyBit.IsEmpty() {
newDesc.StickyBit = &sbt.StickyBit
} else {
newDesc.StickyBit = nil
Expand Down Expand Up @@ -945,7 +945,7 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}
if (*gcThreshold == hlc.Timestamp{}) {
if gcThreshold.IsEmpty() {
log.VEventf(ctx, 1, "LHS's GCThreshold of split is not set")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return m
},
emptySum: 7551962144604783939,
populatedSum: 12720006657210437557,
populatedSum: 5737658018003400959,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down Expand Up @@ -124,7 +124,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
},
emptySum: 14695981039346656037,
populatedSum: 7432412240713840291,
populatedSum: 834545685817460463,
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
_, metaRepl := getFirstStoreReplica(t, tc.Server(1), keys.Meta2Prefix)
l, _ := metaRepl.GetLease()
if l.Start == (hlc.Timestamp{}) {
if l.Start.IsEmpty() {
return errors.Errorf("don't have a lease for meta1 yet: %v %v", l, meta2Repl1)
}
sinceExpBasedLeaseStart := timeutil.Since(timeutil.Unix(0, l.Start.WallTime))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ func TestStoreSplitTimestampCacheDifferentLeaseHolder(t *testing.T) {
}

// Verify that the txn's safe timestamp was set.
if txnOld.TestingCloneTxn().ReadTimestamp == (hlc.Timestamp{}) {
if txnOld.TestingCloneTxn().ReadTimestamp.IsEmpty() {
t.Fatal("expected non-zero refreshed timestamp")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/debug_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func tryTxn(kv storage.MVCCKeyValue) (string, error) {
}

func tryRangeIDKey(kv storage.MVCCKeyValue) (string, error) {
if kv.Key.Timestamp != (hlc.Timestamp{}) {
if !kv.Key.Timestamp.IsEmpty() {
return "", fmt.Errorf("range ID keys shouldn't have timestamps: %s", kv.Key)
}
_, _, suffix, _, err := keys.DecodeRangeIDKey(kv.Key.Key)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc/data_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (ds dataDistribution) setupTest(
} else {
// TODO(ajwerner): Decide if using MVCCPut is worth it.
ts := kv.Key.Timestamp
if txn.ReadTimestamp == (hlc.Timestamp{}) {
if txn.ReadTimestamp.IsEmpty() {
txn.ReadTimestamp = ts
}
if txn.WriteTimestamp == (hlc.Timestamp{}) {
if txn.WriteTimestamp.IsEmpty() {
txn.WriteTimestamp = ts
}
err := storage.MVCCPut(ctx, eng, &ms, kv.Key.Key, ts,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func processReplicatedKeyRange(
if meta.Txn != nil {
// Keep track of intent to resolve if older than the intent
// expiration threshold.
if hlc.Timestamp(meta.Timestamp).Less(intentExp) {
if meta.Timestamp.ToTimestamp().Less(intentExp) {
txnID := meta.Txn.ID
if _, ok := txnMap[txnID]; !ok {
txnMap[txnID] = &roachpb.Transaction{
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func runGCOld(
if meta.Txn != nil {
// Keep track of intent to resolve if older than the intent
// expiration threshold.
if hlc.Timestamp(meta.Timestamp).Less(intentExp) {
if meta.Timestamp.ToTimestamp().Less(intentExp) {
txnID := meta.Txn.ID
if _, ok := txnMap[txnID]; !ok {
txnMap[txnID] = &roachpb.Transaction{
Expand All @@ -125,7 +125,7 @@ func runGCOld(
startIdx = 2
}
// See if any values may be GC'd.
if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) {
if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); !gcTS.IsEmpty() {
// Batch keys after the total size of version keys exceeds
// the threshold limit. This avoids sending potentially large
// GC requests through Raft. Iterate through the keys in reverse
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func makeGCQueueScoreImpl(
) gcQueueScore {
ms.Forward(now.WallTime)
var r gcQueueScore
if (gcThreshold != hlc.Timestamp{}) {
if !gcThreshold.IsEmpty() {
r.LikelyLastGC = time.Duration(now.WallTime - gcThreshold.Add(r.TTL.Nanoseconds(), 0).WallTime)
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,7 @@ func (nl *NodeLiveness) heartbeatInternal(
// [*]: see TODO below about how errNodeAlreadyLive handling does not
// enforce this guarantee.
beforeQueueTS := nl.clock.Now()
minExpiration := hlc.LegacyTimestamp(
beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
minExpiration := beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0).ToLegacyTimestamp()

// Before queueing, record the heartbeat as in-flight.
nl.metrics.HeartbeatsInFlight.Inc(1)
Expand Down Expand Up @@ -873,8 +872,7 @@ func (nl *NodeLiveness) heartbeatInternal(
// Grab a new clock reading to compute the new expiration time,
// since we may have queued on the semaphore for a while.
afterQueueTS := nl.clock.Now()
newLiveness.Expiration = hlc.LegacyTimestamp(
afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
newLiveness.Expiration = afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0).ToLegacyTimestamp()
// This guards against the system clock moving backwards. As long
// as the cockroach process is running, checks inside hlc.Clock
// will ensure that the clock never moves backwards, but these
Expand Down
Loading

0 comments on commit 1930679

Please sign in to comment.