From 6960c610d9dd460e6050e01d854a935f756053e6 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 15:49:31 -0400 Subject: [PATCH 1/9] kvserver: rename a variable to be consistent with naming elsewhere Release note: None --- pkg/kv/kvserver/helpers_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a3a61b766b5b..8431eb3c70d0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -516,10 +516,10 @@ func (nl *NodeLiveness) SetDrainingInternal( func (nl *NodeLiveness) SetDecommissioningInternal( ctx context.Context, nodeID roachpb.NodeID, - liveness LivenessRecord, + oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (changeCommitted bool, err error) { - return nl.setMembershipStatusInternal(ctx, nodeID, liveness, targetStatus) + return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) } // GetCircuitBreaker returns the circuit breaker controlling From 24f5b76caf4bbc59d05b9c79be202faf11123a32 Mon Sep 17 00:00:00 2001 From: "James H. Linder" Date: Fri, 25 Sep 2020 10:20:37 -0400 Subject: [PATCH 2/9] docker: Base the docker image on RedHat UBI Before: The docker image was based on Debian 9.12 slim. Why: This change will help on-prem customers from a security and compliance perspective. It also aligns with our publishing images into the RedHat Marketplace. Now: Published docker images are based on the RedHat UBI 8 base image. Fixes: #49643 Release note (backward-incompatible change): CockroachDB Docker images are now based on the RedHat ubi8/ubi base image instead of Debian 9.12 slim. This will help on-prem customers from a security and compliance perspective. --- build/deploy/Dockerfile | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/build/deploy/Dockerfile b/build/deploy/Dockerfile index 7d9c62b0dde7..6579a9aeb55e 100644 --- a/build/deploy/Dockerfile +++ b/build/deploy/Dockerfile @@ -1,14 +1,12 @@ -FROM debian:9.12-slim +FROM registry.access.redhat.com/ubi8/ubi -# For deployment, we need -# libc6 - dynamically linked by cockroach binary +# For deployment, we need the following installed (they are installed +# by default in RedHat UBI standard): +# glibc - dynamically linked by cockroach binary # ca-certificates - to authenticate TLS connections for telemetry and # bulk-io with S3/GCS/Azure # tzdata - for time zone functions -RUN apt-get update && \ - apt-get -y upgrade && \ - apt-get install -y libc6 ca-certificates tzdata && \ - rm -rf /var/lib/apt/lists/* +RUN yum update --disablerepo=* --enablerepo=ubi-8-appstream --enablerepo=ubi-8-baseos -y && rm -rf /var/cache/yum # Install GEOS libraries. RUN mkdir /usr/local/lib/cockroach @@ -16,13 +14,13 @@ COPY libgeos.so libgeos_c.so /usr/local/lib/cockroach/ RUN mkdir -p /cockroach COPY cockroach.sh cockroach /cockroach/ + # Set working directory so that relative paths # are resolved appropriately when passed as args. WORKDIR /cockroach/ -# Include the directory into the path -# to make it easier to invoke commands -# via Docker +# Include the directory in the path to make it easier to invoke +# commands via Docker ENV PATH=/cockroach:$PATH ENV COCKROACH_CHANNEL=official-docker From 466b664888169f428e11ce63911d3441d97a4f3a Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 17:58:37 -0400 Subject: [PATCH 3/9] kvserver: return an error from the SetDraining codepath We'll need it in a future commit. Not returning an error masked an existing bug (which we leave unfixed for now) where it was possible for us to be unable to mark the target node as draining and still proceed. The usage of SetDraining does not assume it to be a best-effort attempt. Release note: None --- pkg/kv/kvserver/node_liveness.go | 9 +++++++-- pkg/kv/kvserver/node_liveness_test.go | 4 +++- pkg/server/drain.go | 4 +++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 80a08f2695c1..cb59b4b420cc 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -256,7 +256,7 @@ func (nl *NodeLiveness) sem(nodeID roachpb.NodeID) chan struct{} { // pkg/server/drain.go for details. func (nl *NodeLiveness) SetDraining( ctx context.Context, drain bool, reporter func(int, redact.SafeString), -) { +) error { ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { oldLivenessRec, err := nl.SelfEx() @@ -270,8 +270,13 @@ func (nl *NodeLiveness) SetDraining( } continue } - return + return nil } + // TODO(irfansharif): The code flow here seems buggy? It seems it is only a + // best effort attempt at marking the node as draining. It can return + // without an error after trying unsuccessfully a few times. Is that what we + // want? + return nil } // SetMembershipStatus changes the liveness record to reflect the target diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 4fc9aab9d70f..3abc2e509531 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -796,7 +796,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { t.Fatal(err) } - mtc.nodeLivenesses[drainingNodeIdx].SetDraining(ctx, true /* drain */, nil /* reporter */) + if err := mtc.nodeLivenesses[drainingNodeIdx].SetDraining(ctx, true /* drain */, nil /* reporter */); err != nil { + t.Fatal(err) + } // Draining node disappears from store lists. { diff --git a/pkg/server/drain.go b/pkg/server/drain.go index 971dc69daba5..4af61016797f 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -229,6 +229,8 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf // drainNode initiates the draining mode for the node, which // starts draining range leases. func (s *Server) drainNode(ctx context.Context, reporter func(int, redact.SafeString)) error { - s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter) + if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { + return err + } return s.node.SetDraining(true /* drain */, reporter) } From 6b439960b7a086f4e9ca2ce90d524352301984ec Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 29 Sep 2020 16:06:03 -0400 Subject: [PATCH 4/9] kvserver: return error if drain attempt unsuccessful In an earlier refactor we observed that it was possible to return successfully from a node drain attempt despite not having fully drained the target node. This made the `cockroach node drain` invocation a best effort attempt, not a guaranteed one, which was not the intent. We return the appropriate failure now. Release note: None --- pkg/kv/kvserver/node_liveness.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index cb59b4b420cc..76d3ea74a168 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -272,11 +272,10 @@ func (nl *NodeLiveness) SetDraining( } return nil } - // TODO(irfansharif): The code flow here seems buggy? It seems it is only a - // best effort attempt at marking the node as draining. It can return - // without an error after trying unsuccessfully a few times. Is that what we - // want? - return nil + if err := ctx.Err(); err != nil { + return err + } + return errors.New("failed to drain self") } // SetMembershipStatus changes the liveness record to reflect the target From 48c955dd6dd5d12200ee9a5545172be4b1746dc5 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 18:03:14 -0400 Subject: [PATCH 5/9] kvserver: remove all test instances of empty liveness records In a future commit we'll introduce assertions that this is no longer possible. Let's update our tests to be more representative of inputs we'd normally expect. Release note: None --- pkg/kv/kvserver/node_liveness_test.go | 34 ++++++++++++++++++---- pkg/kv/kvserver/node_liveness_unit_test.go | 2 +- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 3abc2e509531..7d41fe348794 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -481,9 +482,10 @@ func TestNodeLivenessRestart(t *testing.T) { // seeing the liveness record properly gossiped at store startup. var expKeys []string for _, g := range mtc.gossips { - key := gossip.MakeNodeLivenessKey(g.NodeID.Get()) + nodeID := g.NodeID.Get() + key := gossip.MakeNodeLivenessKey(nodeID) expKeys = append(expKeys, key) - if err := g.AddInfoProto(key, &kvserverpb.Liveness{}, 0); err != nil { + if err := g.AddInfoProto(key, &kvserverpb.Liveness{NodeID: nodeID}, 0); err != nil { t.Fatal(err) } } @@ -612,7 +614,15 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + var liveness kvserver.LivenessRecord + testutils.SucceedsSoon(t, func() error { + livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if err != nil { + return err + } + liveness = livenessRec + return nil + }) testutils.SucceedsSoon(t, func() error { if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { @@ -668,7 +678,15 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + var liveness kvserver.LivenessRecord + testutils.SucceedsSoon(t, func() error { + livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if err != nil { + return err + } + liveness = livenessRec + return nil + }) if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) } @@ -791,7 +809,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Verify success on failed update of a liveness record that already has the // given draining setting. if err := mtc.nodeLivenesses[drainingNodeIdx].SetDrainingInternal( - ctx, kvserver.LivenessRecord{}, false, + ctx, kvserver.LivenessRecord{Liveness: kvserverpb.Liveness{ + NodeID: drainingNodeID, + }}, false, ); err != nil { t.Fatal(err) } @@ -1079,8 +1099,10 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { // Verify success on failed update of a liveness record that already has the // given decommissioning setting. + oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID) + assert.Nil(t, err) if _, err := callerNodeLiveness.SetDecommissioningInternal( - ctx, nodeID, kvserver.LivenessRecord{}, kvserverpb.MembershipStatus_ACTIVE, + ctx, nodeID, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/node_liveness_unit_test.go b/pkg/kv/kvserver/node_liveness_unit_test.go index 17fc439995dc..412e69262953 100644 --- a/pkg/kv/kvserver/node_liveness_unit_test.go +++ b/pkg/kv/kvserver/node_liveness_unit_test.go @@ -58,8 +58,8 @@ func TestShouldReplaceLiveness(t *testing.T) { }{ { // Epoch update only. - kvserverpb.Liveness{}, l(1, hlc.Timestamp{}, false, "active"), + l(2, hlc.Timestamp{}, false, "active"), yes, }, { From 6d7d5306cabfa0cd8e897ee50e68c0cbdbfccca3 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 18:07:54 -0400 Subject: [PATCH 6/9] kvserver: add assertions for invariants around up liveness records Now that we have #53842, we maintain the invariant that there always exists a liveness record for any given node. We can now simplify our handling of liveness records internally: where previously we had code to handle the possibility of empty liveness records (we created a new one on the fly), we change them to assertions that verify that empty liveness records are no longer flying around in the system. When retrieving the liveness record from our in-memory cache, it was possible for us to not find anything due to gossip delays. Instead of simply giving up then, now we can read the records directly from KV (and evebtually update our caches to store this newly read record). This PR introduces this mechanism through usage of `getLivenessRecordFromKV`. We should note that the existing cache structure within NodeLiveness is a look-aside cache, and that's not changed. It would further simplify things if it was a look-through cache where the update happened while fetching any record and failing to find it, but we defer that to future work. A TODO outlining this will be introduced in a future commit. A note for ease of review: one structural change introduced in this diff is breaking down `ErrNoLivenessRecord` into `ErrMissingLivenessRecord` and `errLivenessRecordCacheMiss`. The former will be used in a future commit to generate better hints for users (it'll only ever surface when attempting to decommission/recommission non-existent nodes). The latter is used to represent cache misses. This too will be improved in a future commit, where instead of returning a specific error on cache access, we'll return a boolean instead. --- We don't intend to backport this to 20.2 due to the hazard described in \#54216. We want this PR to bake on master and (possibly) trip up the assertions added above if we've missed anything. They're the only ones checking for the invariant we've introduced around liveness records. That invariant will be depended on for long running migrations, so better to shake things out early. Release note: None --- pkg/kv/kvserver/helpers_test.go | 7 +- pkg/kv/kvserver/node_liveness.go | 269 ++++++++++----------- pkg/kv/kvserver/node_liveness_test.go | 4 +- pkg/kv/kvserver/node_liveness_unit_test.go | 3 +- 4 files changed, 140 insertions(+), 143 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 8431eb3c70d0..a359b65909cb 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -514,12 +514,9 @@ func (nl *NodeLiveness) SetDrainingInternal( } func (nl *NodeLiveness) SetDecommissioningInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (changeCommitted bool, err error) { - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } // GetCircuitBreaker returns the circuit breaker controlling diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 76d3ea74a168..436b86a47a00 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -40,9 +40,14 @@ import ( ) var ( - // ErrNoLivenessRecord is returned when asking for liveness information - // about a node for which nothing is known. - ErrNoLivenessRecord = errors.New("node not in the liveness table") + // ErrMissingLivenessRecord is returned when asking for liveness information + // about a node for which nothing is known. This happens when attempting to + // {d,r}ecommission a non-existent node. + ErrMissingLivenessRecord = errors.New("missing liveness record") + + // errLivenessRecordCacheMiss is returned when asking for the liveness + // record of a given node and it is not found in the in-memory cache. + errLivenessRecordCacheMiss = errors.New("liveness record not found in cache") // errChangeMembershipStatusFailed is returned when we're not able to // conditionally write the target membership status. It's safe to retry @@ -260,11 +265,22 @@ func (nl *NodeLiveness) SetDraining( ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { oldLivenessRec, err := nl.SelfEx() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) - } - err = nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter) if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + // There was a cache miss, let's now fetch the record from KV + // directly. + nodeID := nl.gossip.NodeID.Get() + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return err + } + oldLivenessRec = livenessRec + } + if err := nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter); err != nil { if log.V(1) { log.Infof(ctx, "attempting to set liveness draining status to %v: %v", drain, err) } @@ -336,7 +352,8 @@ func (nl *NodeLiveness) SetMembershipStatus( return false, errors.Wrap(err, "unable to get liveness") } if kv.Value == nil { - return false, ErrNoLivenessRecord + // We must be trying to decommission a node that does not exist. + return false, ErrMissingLivenessRecord } if err := kv.Value.GetProto(&oldLiveness); err != nil { return false, errors.Wrap(err, "invalid liveness record") @@ -351,8 +368,8 @@ func (nl *NodeLiveness) SetMembershipStatus( // Offer it to make sure that when we actually try to update the // liveness, the previous view is correct. This, too, is required to // de-flake TestNodeLivenessDecommissionAbsent. - nl.maybeUpdate(oldLivenessRec) - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + nl.maybeUpdate(ctx, oldLivenessRec) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } for { @@ -383,18 +400,14 @@ func (nl *NodeLiveness) setDrainingInternal( <-sem }() - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - newLiveness = oldLivenessRec.Liveness + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness + if reporter != nil && drain && !newLiveness.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") @@ -408,7 +421,7 @@ func (nl *NodeLiveness) setDrainingInternal( ignoreCache: true, } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Draining == update.newLiveness.Draining { return errNodeDrainingSet @@ -425,7 +438,7 @@ func (nl *NodeLiveness) setDrainingInternal( return err } - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) return nil } @@ -505,29 +518,15 @@ func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb } func (nl *NodeLiveness) setMembershipStatusInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (statusChanged bool, err error) { - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - // - // TODO(irfansharif): The above is now no longer possible. We always - // create one (see CreateLivenessRecord, WriteInitialClusterData) when - // adding a node to the cluster. We should clean up all this logic that - // tries to work around the liveness record possibly not existing. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - // We start off with a copy of our existing liveness record. - newLiveness = oldLivenessRec.Liveness + return false, errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness newLiveness.Membership = targetStatus if oldLivenessRec.Membership == newLiveness.Membership { // No-op. Return early. @@ -539,11 +538,8 @@ func (nl *NodeLiveness) setMembershipStatusInternal( return false, nil } - if oldLivenessRec.Liveness != (kvserverpb.Liveness{}) { - err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness) - if err != nil { - return false, err - } + if err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness); err != nil { + return false, err } update := livenessUpdate{ @@ -636,8 +632,19 @@ func (nl *NodeLiveness) StartHeartbeat( // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { oldLiveness, err := nl.Self() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) + if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + nodeID := nl.gossip.NodeID.Get() + liveness, err := nl.getLivenessFromKV(ctx, nodeID) + if err != nil { + log.Infof(ctx, "unable to get liveness record: %s", err) + continue + } + oldLiveness = liveness } if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { @@ -797,67 +804,13 @@ func (nl *NodeLiveness) heartbeatInternal( } } - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness - if oldLiveness != (kvserverpb.Liveness{}) { - // Start off with our existing view of liveness. - newLiveness = oldLiveness - } else { - // We haven't seen our own liveness record yet[1]. This happens when - // we're heartbeating for the very first time. Let's retrieve it from KV - // before proceeding. - // - // [1]: Elsewhere we maintain the invariant that there always exist a - // liveness record for every given node. See the join RPC and - // WriteInitialClusterData for where that's done. - kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) - if err != nil { - return errors.Wrap(err, "unable to get liveness") - } - - if kv.Value != nil { - // This is the happy path. Let's unpack the liveness record we found - // within KV, and use that to inform what our new liveness should - // be. - if err := kv.Value.GetProto(&oldLiveness); err != nil { - return errors.Wrap(err, "invalid liveness record") - } - - oldLivenessRec := LivenessRecord{ - Liveness: oldLiveness, - raw: kv.Value.TagAndDataBytes(), - } - - // Update our cache with the liveness record we just found. - nl.maybeUpdate(oldLivenessRec) - - newLiveness = oldLiveness - } else { - // This is a "should basically never happen" scenario given our - // invariant around always persisting liveness records on node - // startup. But that was a change we added in 20.2. Though unlikely, - // it's possible to get into the following scenario: - // - // - v20.1 node gets added to v20.1 cluster, and is quickly removed - // before being able to persist its liveness record. - // - The cluster is upgraded to v20.2. - // - The node from earlier is rolled into v20.2, and re-added to the - // cluster. - // - It's never able to successfully heartbeat (it didn't join - // through the join rpc, bootstrap, or gossip). Welp. - // - // Given this possibility, we'll just fall back to creating the - // liveness record here as we did in v20.1 code. - // - // TODO(irfansharif): Remove this once v20.2 is cut. - log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 0, // incremented to epoch=1 below as needed - } - } + if oldLiveness == (kvserverpb.Liveness{}) { + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. Start off with our + // existing view of things. + newLiveness := oldLiveness if incrementEpoch { newLiveness.Epoch++ newLiveness.Draining = false // clear draining field @@ -882,7 +835,7 @@ func (nl *NodeLiveness) heartbeatInternal( } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) // If the actual liveness is different than expected, but is // considered live, treat the heartbeat as a success. This can @@ -923,12 +876,12 @@ func (nl *NodeLiveness) heartbeatInternal( } log.VEventf(ctx, 1, "heartbeat %+v", written.Expiration) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.HeartbeatSuccesses.Inc(1) return nil } -// Self returns the liveness record for this node. ErrNoLivenessRecord +// Self returns the liveness record for this node. ErrMissingLivenessRecord // is returned in the event that the node has neither heartbeat its // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. @@ -1004,11 +957,53 @@ func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, erro return nl.getLivenessLocked(nodeID) } +// TODO(irfansharif): This only returns one possible error, so should be made to +// return a boolean instead. func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) { if l, ok := nl.mu.nodes[nodeID]; ok { return l, nil } - return LivenessRecord{}, ErrNoLivenessRecord + return LivenessRecord{}, errLivenessRecordCacheMiss +} + +// getLivenessFromKV fetches the liveness record from KV for a given node, and +// updates the internal in-memory cache when doing so. +func (nl *NodeLiveness) getLivenessFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (kvserverpb.Liveness, error) { + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return kvserverpb.Liveness{}, err + } + return livenessRec.Liveness, nil +} + +// getLivenessRecordFromKV is like getLivenessFromKV, but returns the raw, +// encoded value that the database has for this liveness record in addition to +// the decoded liveness proto. +func (nl *NodeLiveness) getLivenessRecordFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (LivenessRecord, error) { + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return LivenessRecord{}, errors.Wrap(err, "unable to get liveness") + } + if kv.Value == nil { + return LivenessRecord{}, errors.AssertionFailedf("missing liveness record") + } + var liveness kvserverpb.Liveness + if err := kv.Value.GetProto(&liveness); err != nil { + return LivenessRecord{}, errors.Wrap(err, "invalid liveness record") + } + + livenessRec := LivenessRecord{ + Liveness: liveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(ctx, livenessRec) + return livenessRec, nil } // IncrementEpoch is called to attempt to revoke another node's @@ -1058,7 +1053,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. update.newLiveness.Epoch++ written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Epoch > liveness.Epoch { return ErrEpochAlreadyIncremented @@ -1072,7 +1067,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. } log.Infof(ctx, "incremented n%d liveness epoch to %d", written.NodeID, written.Epoch) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.EpochIncrements.Inc(1) return nil } @@ -1158,10 +1153,10 @@ func (nl *NodeLiveness) updateLivenessAttempt( } l, err := nl.GetLiveness(update.newLiveness.NodeID) - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { + if err != nil { return LivenessRecord{}, err } - if err == nil && l.Liveness != update.oldLiveness { + if l.Liveness != update.oldLiveness { return LivenessRecord{}, handleCondFailed(l) } oldRaw = l.raw @@ -1219,40 +1214,49 @@ func (nl *NodeLiveness) updateLivenessAttempt( // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. -func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { - // An empty new means that we haven't updated anything. - if new.Liveness == (kvserverpb.Liveness{}) { - return +func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec LivenessRecord) { + if newLivenessRec.Liveness == (kvserverpb.Liveness{}) { + log.Fatal(ctx, "invalid new liveness record; found to be empty") } + var shouldReplace bool nl.mu.Lock() - old := nl.mu.nodes[new.NodeID] + oldLivenessRec, err := nl.getLivenessLocked(newLivenessRec.NodeID) + if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + shouldReplace = true + } else { + shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) + } - should := shouldReplaceLiveness(old.Liveness, new.Liveness) var callbacks []IsLiveCallback - if should { - nl.mu.nodes[new.NodeID] = new + if shouldReplace { + nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec callbacks = append(callbacks, nl.mu.callbacks...) } nl.mu.Unlock() - if !should { + if !shouldReplace { return } now := nl.clock.Now().GoTime() - if !old.IsLive(now) && new.IsLive(now) { + if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { for _, fn := range callbacks { - fn(new.Liveness) + fn(newLivenessRec.Liveness) } } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer // than the old liveness. -func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { +func shouldReplaceLiveness(ctx context.Context, old, new kvserverpb.Liveness) bool { if (old == kvserverpb.Liveness{}) { - return true + log.Fatal(ctx, "invalid old liveness record; found to be empty") } // Compare liveness information. If old < new, replace. @@ -1276,12 +1280,13 @@ func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { // in-memory liveness info up to date. func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { var liveness kvserverpb.Liveness + ctx := context.TODO() if err := content.GetProto(&liveness); err != nil { - log.Errorf(context.TODO(), "%v", err) + log.Errorf(ctx, "%v", err) return } - nl.maybeUpdate(LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) + nl.maybeUpdate(ctx, LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) } // numLiveNodes is used to populate a metric that tracks the number of live @@ -1293,8 +1298,6 @@ func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { // nodes reporting the metric, so it's simplest to just have all live nodes // report it. func (nl *NodeLiveness) numLiveNodes() int64 { - ctx := nl.ambientCtx.AnnotateCtx(context.Background()) - selfID := nl.gossip.NodeID.Get() if selfID == 0 { return 0 @@ -1304,11 +1307,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { defer nl.mu.RUnlock() self, err := nl.getLivenessLocked(selfID) - if errors.Is(err, ErrNoLivenessRecord) { - return 0 - } if err != nil { - log.Warningf(ctx, "looking up own liveness: %+v", err) return 0 } now := nl.clock.Now().GoTime() diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 7d41fe348794..af95e76c6839 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -1102,7 +1102,7 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID) assert.Nil(t, err) if _, err := callerNodeLiveness.SetDecommissioningInternal( - ctx, nodeID, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, + ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { t.Fatal(err) } @@ -1159,7 +1159,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { // When the node simply never existed, expect an error. if _, err := mtc.nodeLivenesses[0].SetMembershipStatus( ctx, goneNodeID, kvserverpb.MembershipStatus_DECOMMISSIONING, - ); !errors.Is(err, kvserver.ErrNoLivenessRecord) { + ); !errors.Is(err, kvserver.ErrMissingLivenessRecord) { t.Fatal(err) } diff --git a/pkg/kv/kvserver/node_liveness_unit_test.go b/pkg/kv/kvserver/node_liveness_unit_test.go index 412e69262953..9b780ace0777 100644 --- a/pkg/kv/kvserver/node_liveness_unit_test.go +++ b/pkg/kv/kvserver/node_liveness_unit_test.go @@ -11,6 +11,7 @@ package kvserver import ( + "context" "fmt" "testing" @@ -106,7 +107,7 @@ func TestShouldReplaceLiveness(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - if act := shouldReplaceLiveness(test.old, test.new); act != test.exp { + if act := shouldReplaceLiveness(context.Background(), test.old, test.new); act != test.exp { t.Errorf("unexpected update: %+v", test) } }) From d6312391609b3e312a963a6d8193723abed0c36b Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 17:54:08 -0400 Subject: [PATCH 7/9] kvserver: add TODO about caching structure improvement This TODO hopefully adds some clarity to the current code structure and suggests what it should look like going forward. Release note: None --- pkg/kv/kvserver/node_liveness.go | 35 +++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 436b86a47a00..474bcc7a90fc 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -182,7 +182,40 @@ type NodeLiveness struct { mu struct { syncutil.RWMutex - callbacks []IsLiveCallback + callbacks []IsLiveCallback + // nodes is an in-memory cache of liveness records that NodeLiveness + // knows about (having learnt of them through gossip or through KV). + // It's a look-aside cache, and is accessed primarily through + // `getLivenessLocked` and callers. + // + // TODO(irfansharif): The caching story for NodeLiveness is a bit + // complicated. This can be attributed to the fact that pre-20.2, we + // weren't always guaranteed for us liveness records for every given + // node. Because of this it wasn't possible to have a + // look-through cache (it wouldn't know where to fetch from if a record + // was found to be missing). + // + // Now that we're always guaranteed to have a liveness records present, + // we should change this out to be a look-through cache instead (it can + // fall back to KV when a given record is missing). This would help + // simplify our current structure where do the following: + // + // - Consult this cache to find an existing liveness record + // - If missing, fetch the record from KV + // - Update the liveness record in KV + // - Add the updated record into this cache (see `maybeUpdate`) + // + // (See `StartHeartbeat` for an example of this pattern.) + // + // What we want instead is a bit simpler: + // + // - Consult this cache to find an existing liveness record + // - If missing, fetch the record from KV, update and return from cache + // - Update the liveness record in KV + // - Add the updated record into this cache + // + // More concretely, we want `getLivenessRecordFromKV` to be tucked away + // within `getLivenessLocked`. nodes map[roachpb.NodeID]LivenessRecord heartbeatCallback HeartbeatCallback // Before heartbeating, we write to each of these engines to avoid From 84d9c12bdce0747026d232c9fba3918bd1d91ab0 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 18:00:00 -0400 Subject: [PATCH 8/9] server,cli: improve {d,r}ecommissioning user errors Specifically around {d,r}ecommissioning non-existent nodes. We're able to do this now because we can always assume that a missing liveness record, as seen in the decommission/recommission codepaths, imply that the user is trying to decommission/recommission a non-existent node. Release note: None --- pkg/cli/node.go | 22 ++++++++++++++++++++-- pkg/server/server.go | 5 +++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pkg/cli/node.go b/pkg/cli/node.go index 8ed8f735362b..3d7fd3863cac 100644 --- a/pkg/cli/node.go +++ b/pkg/cli/node.go @@ -337,7 +337,17 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error { } c := serverpb.NewAdminClient(conn) - return runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, nodeIDs) + if err := runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, nodeIDs); err != nil { + cause := errors.UnwrapAll(err) + if s, ok := status.FromError(cause); ok && s.Code() == codes.NotFound { + // Are we trying to decommision a node that does not + // exist? See Server.Decommission for where this specific grpc error + // code is generated. + return errors.New("node does not exist") + } + return err + } + return nil } func handleNodeDecommissionSelf( @@ -566,13 +576,21 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error { } resp, err := c.Decommission(ctx, req) if err != nil { + cause := errors.UnwrapAll(err) // If it's a specific illegal membership transition error, we try to // surface a more readable message to the user. See // ValidateLivenessTransition in kvserverpb/liveness.go for where this // error is generated. - if s, ok := status.FromError(err); ok && s.Code() == codes.FailedPrecondition { + if s, ok := status.FromError(cause); ok && s.Code() == codes.FailedPrecondition { return errors.Newf("%s", s.Message()) } + if s, ok := status.FromError(cause); ok && s.Code() == codes.NotFound { + // Are we trying to recommission node that does not + // exist? See Server.Decommission for where this specific grpc error + // code is generated. + fmt.Fprintln(stderr) + return errors.New("node does not exist") + } return err } return printDecommissionStatus(*resp) diff --git a/pkg/server/server.go b/pkg/server/server.go index 46ddcf4fe877..389e3b73a45e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -86,7 +86,9 @@ import ( gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + grpcstatus "google.golang.org/grpc/status" ) var ( @@ -1965,6 +1967,9 @@ func (s *Server) Decommission( for _, nodeID := range nodeIDs { statusChanged, err := s.nodeLiveness.SetMembershipStatus(ctx, nodeID, targetStatus) if err != nil { + if errors.Is(err, kvserver.ErrMissingLivenessRecord) { + return grpcstatus.Error(codes.NotFound, kvserver.ErrMissingLivenessRecord.Error()) + } return err } if statusChanged { From 785aea768a8c8bdf95f8743a74a14c973baf0f6e Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 19:07:14 -0400 Subject: [PATCH 9/9] kvserver: improve look-aside cache get API Specifically, apply the following diff (and propagate through callers): ```diff - getLivenessLocked(roachpb.NodeID) (LivenessRecord, error) + getLivenessLocked(roachpb.NodeID) (_ LivenessRecord, ok bool) ``` Previously there was only one possible error type returned, so just a drive-by simplification. Release note: None --- pkg/jobs/helpers.go | 4 +- pkg/jobs/registry.go | 6 +- pkg/kv/kvserver/client_test.go | 7 +- pkg/kv/kvserver/node_liveness.go | 99 +++++++-------- pkg/kv/kvserver/node_liveness_test.go | 115 ++++++++---------- pkg/kv/kvserver/replica_gc_queue.go | 3 +- pkg/kv/kvserver/replica_range_lease.go | 14 +-- pkg/kv/kvserver/store_pool.go | 4 +- pkg/server/admin.go | 12 +- pkg/server/admin_test.go | 6 +- pkg/sql/optionalnodeliveness/node_liveness.go | 2 +- 11 files changed, 123 insertions(+), 149 deletions(-) diff --git a/pkg/jobs/helpers.go b/pkg/jobs/helpers.go index 9fe6f2e245eb..9c17ede4b962 100644 --- a/pkg/jobs/helpers.go +++ b/pkg/jobs/helpers.go @@ -65,14 +65,14 @@ func (*FakeNodeLiveness) ModuleTestingKnobs() {} // Self implements the implicit storage.NodeLiveness interface. It uses NodeID // as the node ID. On every call, a nonblocking send is performed over nl.ch to // allow tests to execute a callback. -func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, error) { +func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, bool) { select { case nl.SelfCalledCh <- struct{}{}: default: } nl.mu.Lock() defer nl.mu.Unlock() - return *nl.mu.livenessMap[FakeNodeID.Get()], nil + return *nl.mu.livenessMap[FakeNodeID.Get()], true } // GetLivenesses implements the implicit storage.NodeLiveness interface. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 940808aaf36d..9d5c34ed8453 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -784,10 +784,10 @@ func (r *Registry) maybeCancelJobsDeprecated( // https://github.com/cockroachdb/cockroach/issues/47892 return } - liveness, err := nl.Self() - if err != nil { + liveness, ok := nl.Self() + if !ok { if nodeLivenessLogLimiter.ShouldLog() { - log.Warningf(ctx, "unable to get node liveness: %s", err) + log.Warning(ctx, "own liveness record not found") } // Conservatively assume our lease has expired. Abort all jobs. r.deprecatedCancelAll(ctx) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 15424fbe9526..1942ff5e613c 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1435,11 +1435,12 @@ func (m *multiTestContext) heartbeatLiveness(ctx context.Context, store int) err m.mu.RLock() nl := m.nodeLivenesses[store] m.mu.RUnlock() - l, err := nl.Self() - if err != nil { - return err + l, ok := nl.Self() + if !ok { + return errors.New("liveness not found") } + var err error for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); { if err = nl.Heartbeat(ctx, l); !errors.Is(err, kvserver.ErrEpochIncremented) { break diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 474bcc7a90fc..670050169394 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -297,13 +297,8 @@ func (nl *NodeLiveness) SetDraining( ) error { ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { - oldLivenessRec, err := nl.SelfEx() - if err != nil { - if !errors.Is(err, errLivenessRecordCacheMiss) { - // TODO(irfansharif): Remove this when we change the signature - // of SelfEx to return a boolean instead of an error. - log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) - } + oldLivenessRec, ok := nl.SelfEx() + if !ok { // There was a cache miss, let's now fetch the record from KV // directly. nodeID := nl.gossip.NodeID.Get() @@ -610,9 +605,14 @@ func (nl *NodeLiveness) GetLivenessThreshold() time.Duration { // whether or not its liveness has expired regardless of the liveness status. It // is an error if the specified node is not in the local liveness table. func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { - liveness, err := nl.GetLiveness(nodeID) - if err != nil { - return false, err + liveness, ok := nl.GetLiveness(nodeID) + if !ok { + // TODO(irfansharif): We only expect callers to supply us with node IDs + // they learnt through existing liveness records, which implies we + // should never find ourselves here. We should clean up this conditional + // once we re-visit the caching structure used within NodeLiveness; + // we should be able to return ErrMissingLivenessRecord instead. + return false, errLivenessRecordCacheMiss } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to // consider clock signals from other nodes. @@ -664,17 +664,12 @@ func (nl *NodeLiveness) StartHeartbeat( func(ctx context.Context) error { // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - oldLiveness, err := nl.Self() - if err != nil { - if !errors.Is(err, errLivenessRecordCacheMiss) { - // TODO(irfansharif): Remove this when we change the signature - // of SelfEx to return a boolean instead of an error. - log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) - } + oldLiveness, ok := nl.Self() + if !ok { nodeID := nl.gossip.NodeID.Get() liveness, err := nl.getLivenessFromKV(ctx, nodeID) if err != nil { - log.Infof(ctx, "unable to get liveness record: %s", err) + log.Infof(ctx, "unable to get liveness record from KV: %s", err) continue } oldLiveness = liveness @@ -831,8 +826,8 @@ func (nl *NodeLiveness) heartbeatInternal( // would hit a ConditionFailedError and return a errNodeAlreadyLive down // below. if !incrementEpoch { - curLiveness, err := nl.Self() - if err == nil && minExpiration.Less(curLiveness.Expiration) { + curLiveness, ok := nl.Self() + if ok && minExpiration.Less(curLiveness.Expiration) { return nil } } @@ -918,17 +913,17 @@ func (nl *NodeLiveness) heartbeatInternal( // is returned in the event that the node has neither heartbeat its // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. -func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) { - rec, err := nl.SelfEx() - if err != nil { - return kvserverpb.Liveness{}, err +func (nl *NodeLiveness) Self() (_ kvserverpb.Liveness, ok bool) { + rec, ok := nl.SelfEx() + if !ok { + return kvserverpb.Liveness{}, false } - return rec.Liveness, nil + return rec.Liveness, true } // SelfEx is like Self, but returns the raw, encoded value that the database has // for this liveness record in addition to the decoded liveness proto. -func (nl *NodeLiveness) SelfEx() (LivenessRecord, error) { +func (nl *NodeLiveness) SelfEx() (_ LivenessRecord, ok bool) { nl.mu.RLock() defer nl.mu.RUnlock() return nl.getLivenessLocked(nl.gossip.NodeID.Get()) @@ -979,24 +974,26 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness { return livenesses } -// GetLiveness returns the liveness record for the specified nodeID. -// ErrNoLivenessRecord is returned in the event that nothing is yet known about -// nodeID via liveness gossip. The record returned also includes the raw, -// encoded value that the database has for this liveness record in addition to -// the decoded liveness proto. -func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, error) { +// GetLiveness returns the liveness record for the specified nodeID. If the +// liveness record is not found (due to gossip propagation delays or due to the +// node not existing), we surface that to the caller. The record returned also +// includes the raw, encoded value that the database has for this liveness +// record in addition to the decoded liveness proto. +func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) { nl.mu.RLock() defer nl.mu.RUnlock() return nl.getLivenessLocked(nodeID) } -// TODO(irfansharif): This only returns one possible error, so should be made to -// return a boolean instead. -func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) { +// getLivenessLocked returns the liveness record for the specified nodeID, +// consulting the in-memory cache. If nothing is found (could happen due to +// gossip propagation delays or the node not existing), we surface that to the +// caller. +func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) { if l, ok := nl.mu.nodes[nodeID]; ok { - return l, nil + return l, true } - return LivenessRecord{}, errLivenessRecordCacheMiss + return LivenessRecord{}, false } // getLivenessFromKV fetches the liveness record from KV for a given node, and @@ -1185,9 +1182,12 @@ func (nl *NodeLiveness) updateLivenessAttempt( log.Fatalf(ctx, "unexpected oldRaw when ignoreCache not specified") } - l, err := nl.GetLiveness(update.newLiveness.NodeID) - if err != nil { - return LivenessRecord{}, err + l, ok := nl.GetLiveness(update.newLiveness.NodeID) + if !ok { + // TODO(irfansharif): See TODO in `NodeLiveness.IsLive`, the same + // applies to this conditional. We probably want to be able to + // return ErrMissingLivenessRecord here instead. + return LivenessRecord{}, errLivenessRecordCacheMiss } if l.Liveness != update.oldLiveness { return LivenessRecord{}, handleCondFailed(l) @@ -1254,13 +1254,8 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness var shouldReplace bool nl.mu.Lock() - oldLivenessRec, err := nl.getLivenessLocked(newLivenessRec.NodeID) - if err != nil { - if !errors.Is(err, errLivenessRecordCacheMiss) { - // TODO(irfansharif): Remove this when we change the signature - // of SelfEx to return a boolean instead of an error. - log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) - } + oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID) + if !ok { shouldReplace = true } else { shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) @@ -1339,8 +1334,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 { nl.mu.RLock() defer nl.mu.RUnlock() - self, err := nl.getLivenessLocked(selfID) - if err != nil { + self, ok := nl.getLivenessLocked(selfID) + if !ok { return 0 } now := nl.clock.Now().GoTime() @@ -1364,9 +1359,9 @@ func (nl *NodeLiveness) numLiveNodes() int64 { func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn { return func(nodeID roachpb.NodeID) (hlc.Timestamp, ctpb.Epoch, error) { now := nl.clock.Now() - liveness, err := nl.GetLiveness(nodeID) - if err != nil { - return hlc.Timestamp{}, 0, err + liveness, ok := nl.GetLiveness(nodeID) + if !ok { + return hlc.Timestamp{}, 0, errLivenessRecordCacheMiss } if !liveness.IsLive(now.GoTime()) { return hlc.Timestamp{}, 0, errLiveClockNotLive diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index af95e76c6839..5c94867097e0 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -109,10 +109,8 @@ func TestNodeLiveness(t *testing.T) { } // Trigger a manual heartbeat and verify liveness is reestablished. for _, nl := range mtc.nodeLivenesses { - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) for { err := nl.Heartbeat(context.Background(), l) if err == nil { @@ -146,10 +144,8 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { // Verify liveness of all nodes for all nodes. verifyLiveness(t, mtc) - liveness, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) - if err != nil { - t.Fatal(err) - } + liveness, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + assert.True(t, ok) if liveness.Epoch != 1 { t.Errorf("expected epoch to be set to 1 initially; got %d", liveness.Epoch) } @@ -185,10 +181,8 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) { t.Fatalf("node %d not live", nodeID) } - livenessRec, err := nl.GetLiveness(nodeID) - if err != nil { - t.Fatal(err) - } + livenessRec, ok := nl.GetLiveness(nodeID) + assert.True(t, ok) if livenessRec.NodeID != nodeID { t.Fatalf("expected node ID %d, got %d", nodeID, livenessRec.NodeID) } @@ -205,9 +199,9 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) { func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) { testutils.SucceedsSoon(t, func() error { - liveness, err := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) - if err != nil { - return err + liveness, ok := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) + if !ok { + return errors.New("liveness not found") } if liveness.Epoch < 2 { return errors.Errorf("expected epoch to be >=2 on restart but was %d", liveness.Epoch) @@ -235,8 +229,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { nl.PauseHeartbeatLoopForTest() enableSync := nl.PauseSynchronousHeartbeatsForTest() - liveness, err := nl.Self() - require.NoError(t, err) + liveness, ok := nl.Self() + assert.True(t, ok) hbBefore := nl.Metrics().HeartbeatSuccesses.Count() require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) @@ -251,10 +245,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { if err := nl.Heartbeat(ctx, liveness); err != nil { return err } - livenessAfter, err := nl.Self() - if err != nil { - return err - } + livenessAfter, found := nl.Self() + assert.True(t, found) exp := livenessAfter.Expiration minExp := hlc.LegacyTimestamp(before.Add(nlActive.Nanoseconds(), 0)) if exp.Less(minExp) { @@ -284,8 +276,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) // Send one more heartbeat. Should update liveness record. - liveness, err = nl.Self() - require.NoError(t, err) + liveness, ok = nl.Self() + require.True(t, ok) require.NoError(t, nl.Heartbeat(ctx, liveness)) require.Equal(t, hbBefore+2, nl.Metrics().HeartbeatSuccesses.Count()) require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) @@ -317,10 +309,8 @@ func TestNodeIsLiveCallback(t *testing.T) { // Trigger a manual heartbeat and verify callbacks for each node ID are invoked. for _, nl := range mtc.nodeLivenesses { - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) if err := nl.Heartbeat(context.Background(), l); err != nil { t.Fatal(err) } @@ -376,10 +366,8 @@ func TestNodeHeartbeatCallback(t *testing.T) { // store. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) for _, nl := range mtc.nodeLivenesses { - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) if err := nl.Heartbeat(context.Background(), l); err != nil { t.Fatal(err) } @@ -410,10 +398,8 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { // First try to increment the epoch of a known-live node. deadNodeID := mtc.gossips[1].NodeID.Get() - oldLiveness, err := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) - if err != nil { - t.Fatal(err) - } + oldLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) + assert.True(t, ok) if err := mtc.nodeLivenesses[0].IncrementEpoch( ctx, oldLiveness.Liveness, ); !testutils.IsError(err, "cannot increment epoch on live node") { @@ -428,9 +414,9 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { // Verify that the epoch has been advanced. testutils.SucceedsSoon(t, func() error { - newLiveness, err := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) - if err != nil { - return err + newLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) + if !ok { + return errors.New("liveness not found") } if newLiveness.Epoch != oldLiveness.Epoch+1 { return errors.Errorf("expected epoch to increment") @@ -545,9 +531,12 @@ func TestNodeLivenessSelf(t *testing.T) { // callback. var liveness kvserver.LivenessRecord testutils.SucceedsSoon(t, func() error { - var err error - liveness, err = mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) - return err + l, ok := mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) + if !ok { + return errors.New("liveness not found") + } + liveness = l + return nil }) if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) @@ -575,13 +564,11 @@ func TestNodeLivenessSelf(t *testing.T) { // Self should not see the fake liveness, but have kept the real one. l := mtc.nodeLivenesses[0] - lGetRec, err := l.GetLiveness(g.NodeID.Get()) - require.NoError(t, err) + lGetRec, ok := l.GetLiveness(g.NodeID.Get()) + require.True(t, ok) lGet := lGetRec.Liveness - lSelf, err := l.Self() - if err != nil { - t.Fatal(err) - } + lSelf, ok := l.Self() + assert.True(t, ok) if !reflect.DeepEqual(lGet, lSelf) { t.Errorf("expected GetLiveness() to return same value as Self(): %+v != %+v", lGet, lSelf) } @@ -616,9 +603,9 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) var liveness kvserver.LivenessRecord testutils.SucceedsSoon(t, func() error { - livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) - if err != nil { - return err + livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if !ok { + return errors.New("liveness not found") } liveness = livenessRec return nil @@ -680,9 +667,9 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) var liveness kvserver.LivenessRecord testutils.SucceedsSoon(t, func() error { - livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) - if err != nil { - return err + livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if !ok { + return errors.New("liveness not found") } liveness = livenessRec return nil @@ -729,10 +716,8 @@ func TestNodeLivenessConcurrentHeartbeats(t *testing.T) { // Advance clock past the liveness threshold & concurrently heartbeat node. nl := mtc.nodeLivenesses[0] mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) errCh := make(chan error, concurrency) for i := 0; i < concurrency; i++ { go func() { @@ -763,10 +748,8 @@ func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) { // Advance the clock and this time increment epoch concurrently for node 1. nl := mtc.nodeLivenesses[0] mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) - l, err := nl.GetLiveness(mtc.gossips[1].NodeID.Get()) - if err != nil { - t.Fatal(err) - } + l, ok := nl.GetLiveness(mtc.gossips[1].NodeID.Get()) + assert.True(t, ok) errCh := make(chan error, concurrency) for i := 0; i < concurrency; i++ { go func() { @@ -915,10 +898,8 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) { verifyLiveness(t, mtc) nl := mtc.nodeLivenesses[0] - l, err := nl.Self() - if err != nil { - t.Fatal(err) - } + l, ok := nl.Self() + assert.True(t, ok) // And again on manual heartbeat. injectError.Store(true) @@ -1099,8 +1080,8 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { // Verify success on failed update of a liveness record that already has the // given decommissioning setting. - oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID) - assert.Nil(t, err) + oldLivenessRec, ok := callerNodeLiveness.GetLiveness(nodeID) + assert.True(t, ok) if _, err := callerNodeLiveness.SetDecommissioningInternal( ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index 6ae6c71e3d4d..b5a475aaa4ee 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -165,8 +165,7 @@ func (rgcq *replicaGCQueue) shouldQueue( // dormant ranges. Make sure NodeLiveness isn't nil because it can be in // tests/benchmarks. if repl.store.cfg.NodeLiveness != nil { - if liveness, err := repl.store.cfg.NodeLiveness.Self(); err == nil && - !liveness.Membership.Active() { + if liveness, ok := repl.store.cfg.NodeLiveness.Self(); ok && !liveness.Membership.Active() { return true, replicaGCPriorityDefault } } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 2a6f4c379516..b4ab6989c3ab 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -224,12 +224,12 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( *reqLease.Expiration = status.Timestamp.Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0) } else { // Get the liveness for the next lease holder and set the epoch in the lease request. - liveness, err := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID) - if err != nil { + liveness, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID) + if !ok { llHandle.resolve(roachpb.NewError(&roachpb.LeaseRejectedError{ Existing: status.Lease, Requested: reqLease, - Message: fmt.Sprintf("couldn't request lease for %+v: %v", nextLeaseHolder, err), + Message: fmt.Sprintf("couldn't request lease for %+v: %v", nextLeaseHolder, errLivenessRecordCacheMiss), })) return llHandle } @@ -543,17 +543,17 @@ func (r *Replica) leaseStatus( if lease.Type() == roachpb.LeaseExpiration { expiration = lease.GetExpiration() } else { - l, err := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) + l, ok := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID) status.Liveness = l.Liveness - if err != nil || status.Liveness.Epoch < lease.Epoch { + if !ok || status.Liveness.Epoch < lease.Epoch { // If lease validity can't be determined (e.g. gossip is down // and liveness info isn't available for owner), we can neither // use the lease nor do we want to attempt to acquire it. - if err != nil { + if !ok { if leaseStatusLogLimiter.ShouldLog() { ctx = r.AnnotateCtx(ctx) log.Warningf(ctx, "can't determine lease status of %s due to node liveness error: %+v", - lease.Replica, err) + lease.Replica, errLivenessRecordCacheMiss) } } status.State = kvserverpb.LeaseState_ERROR diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 4a3b25b79a7e..77c59440f0e8 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -103,8 +103,8 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc return func( nodeID roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration, ) kvserverpb.NodeLivenessStatus { - liveness, err := nodeLiveness.GetLiveness(nodeID) - if err != nil { + liveness, ok := nodeLiveness.GetLiveness(nodeID) + if !ok { return kvserverpb.NodeLivenessStatus_UNAVAILABLE } return LivenessStatus(liveness.Liveness, now, timeUntilStoreDead) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 70fe1b236eb9..73560774a8a6 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1344,9 +1344,9 @@ func (s *adminServer) checkReadinessForHealthCheck() error { // TODO(knz): update this code when progress is made on // https://github.com/cockroachdb/cockroach/issues/45123 - l, err := s.server.nodeLiveness.GetLiveness(s.server.NodeID()) - if err != nil { - return s.serverError(err) + l, ok := s.server.nodeLiveness.GetLiveness(s.server.NodeID()) + if !ok { + return status.Error(codes.Unavailable, "liveness record not found") } if !l.IsLive(s.server.clock.Now().GoTime()) { return status.Errorf(codes.Unavailable, "node is not healthy") @@ -1682,9 +1682,9 @@ func (s *adminServer) DecommissionStatus( var res serverpb.DecommissionStatusResponse for nodeID := range replicaCounts { - l, err := s.server.nodeLiveness.GetLiveness(nodeID) - if err != nil { - return nil, errors.Wrapf(err, "unable to get liveness for %d", nodeID) + l, ok := s.server.nodeLiveness.GetLiveness(nodeID) + if !ok { + return nil, errors.Newf("unable to get liveness for %d", nodeID) } nodeResp := serverpb.DecommissionStatusResponse_Status{ NodeID: l.NodeID, diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index f86ab21b32eb..a738f0541cc0 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1349,10 +1349,8 @@ func TestHealthAPI(t *testing.T) { // server's clock. ts := s.(*TestServer) defer ts.nodeLiveness.PauseAllHeartbeatsForTest()() - self, err := ts.nodeLiveness.Self() - if err != nil { - t.Fatal(err) - } + self, ok := ts.nodeLiveness.Self() + assert.True(t, ok) s.Clock().Update(hlc.Timestamp(self.Expiration).Add(1, 0)) var resp serverpb.HealthResponse diff --git a/pkg/sql/optionalnodeliveness/node_liveness.go b/pkg/sql/optionalnodeliveness/node_liveness.go index 1cb5e07039a7..37892a4e164a 100644 --- a/pkg/sql/optionalnodeliveness/node_liveness.go +++ b/pkg/sql/optionalnodeliveness/node_liveness.go @@ -18,7 +18,7 @@ import ( // Interface is the interface used in Container. type Interface interface { - Self() (kvserverpb.Liveness, error) + Self() (kvserverpb.Liveness, bool) GetLivenesses() []kvserverpb.Liveness IsLive(roachpb.NodeID) (bool, error) }