diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 3617d0a4669c..31ab6c44060d 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -309,21 +309,6 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { Multiplier: 2, } - // This is a pretty gross hack to let the bootstrap info (cluster ID, - // liveness records) disseminate through the cluster. Since it's no longer - // happening through gossip, it takes a bit longer to happen. We should do - // two things to improve our story here: - // - // - We should opportunistically write to the liveness table when adding a - // node through the Join RPC. This would also simplify the handling of - // empty liveness records (they would no longer exist). - // - We should add roachtest helpers that wait until each node has received - // cluster ID information, and use it in all the tests that need it (which - // may very well be all the tests). - // - // TODO(irfansharif): Do the above. - time.Sleep(30 * time.Second) - // Partially decommission then recommission a random node, from another // random node. Run a couple of status checks while doing so. { diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 237440087b38..15424fbe9526 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1009,6 +1009,15 @@ func (m *multiTestContext) addStore(idx int) { }{ ch: make(chan struct{}), } + if idx != 0 { + // Given multiTestContext does not make use of the join RPC, we have to + // manually write out liveness records for each node to maintain the + // invariant that all nodes have liveness records present before they + // start heartbeating. + if err := m.nodeLivenesses[idx].CreateLivenessRecord(ctx, nodeID); err != nil { + m.t.Fatal(err) + } + } m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) { now := clock.Now() if err := store.WriteLastUpTimestamp(ctx, now); err != nil { diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index da0e2bf81b4e..06af2f312940 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -450,6 +450,56 @@ type livenessUpdate struct { oldRaw []byte } +// CreateLivenessRecord creates a liveness record for the node specified by the +// given node ID. This is typically used when adding a new node to a running +// cluster, or when bootstrapping a cluster through a given node. +// +// This is a pared down version of StartHeartbeat; it exists only to durably +// persist a liveness to record the node's existence. Nodes will heartbeat their +// records after starting up, and incrementing to epoch=1 when doing so, at +// which point we'll set an appropriate expiration timestamp, gossip the +// liveness record, and update our in-memory representation of it. +// +// NB: An existing liveness record is not overwritten by this method, we return +// an error instead. +func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb.NodeID) error { + // We start off at epoch=0, entrusting the initial heartbeat to increment it + // to epoch=1 to signal the very first time the node is up and running. + liveness := kvserverpb.Liveness{NodeID: nodeID, Epoch: 0} + + // We skip adding an expiration, we only really care about the liveness + // record existing within KV. + + v := new(roachpb.Value) + if err := nl.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + key := keys.NodeLivenessKey(nodeID) + if err := v.SetProto(&liveness); err != nil { + log.Fatalf(ctx, "failed to marshall proto: %s", err) + } + // Given we're looking to create a new liveness record here, we don't + // expect to find anything. + b.CPut(key, v, nil) + + // We don't bother adding a gossip trigger, that'll happen with the + // first heartbeat. We still keep it as a 1PC commit to avoid leaving + // write intents. + b.AddRawRequest(&roachpb.EndTxnRequest{ + Commit: true, + Require1PC: true, + }) + return txn.Run(ctx, b) + }); err != nil { + return err + } + + // We'll learn about this liveness record through gossip eventually, so we + // don't bother updating our in-memory view of node liveness. + + log.Infof(ctx, "created liveness record for n%d", nodeID) + return nil +} + func (nl *NodeLiveness) setMembershipStatusInternal( ctx context.Context, nodeID roachpb.NodeID, @@ -461,16 +511,10 @@ func (nl *NodeLiveness) setMembershipStatusInternal( if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { // Liveness record didn't previously exist, so we create one. // - // TODO(irfansharif): This code feels a bit unwieldy because it's - // possible for a liveness record to not exist previously. It is just - // generally difficult to write it at startup. When a node joins the - // cluster, this completes before it has had a chance to write its - // liveness record. If it gets decommissioned immediately, there won't - // be one yet. The Connect RPC can solve this though, I think? We can - // bootstrap clusters with a liveness record for n1. Any other node at - // some point has to join the cluster for the first time via the Connect - // RPC, which as part of its job can make sure the liveness record - // exists before responding to the new node. + // TODO(irfansharif): The above is now no longer possible. We always + // create one (see CreateLivenessRecord, WriteInitialClusterData) when + // adding a node to the cluster. We should clean up all this logic that + // tries to work around the liveness record possibly not existing. newLiveness = kvserverpb.Liveness{ NodeID: nodeID, Epoch: 1, @@ -587,11 +631,11 @@ func (nl *NodeLiveness) StartHeartbeat( func(ctx context.Context) error { // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - liveness, err := nl.Self() + oldLiveness, err := nl.Self() if err != nil && !errors.Is(err, ErrNoLivenessRecord) { log.Errorf(ctx, "unexpected error getting liveness: %+v", err) } - if err := nl.heartbeatInternal(ctx, liveness, incrementEpoch); err != nil { + if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { log.Infof(ctx, "%s; retrying", err) continue @@ -737,7 +781,7 @@ func (nl *NodeLiveness) heartbeatInternal( // If we are not intending to increment the node's liveness epoch, detect // whether this heartbeat is needed anymore. It is possible that we queued - // for long enough on the sempahore such that other heartbeat attempts ahead + // for long enough on the semaphore such that other heartbeat attempts ahead // of us already incremented the expiration past what we wanted. Note that // if we allowed the heartbeat to proceed in this case, we know that it // would hit a ConditionFailedError and return a errNodeAlreadyLive down @@ -751,20 +795,69 @@ func (nl *NodeLiveness) heartbeatInternal( // Let's compute what our new liveness record should be. var newLiveness kvserverpb.Liveness - if oldLiveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { + if oldLiveness != (kvserverpb.Liveness{}) { + // Start off with our existing view of liveness. newLiveness = oldLiveness - if incrementEpoch { - newLiveness.Epoch++ - newLiveness.Draining = false // Clear draining field. + } else { + // We don't yet know about our own liveness record (which does exist, we + // maintain the invariant that there's always a liveness record for + // every given node). Let's retrieve it from KV before proceeding. + // + // If we didn't previously know about our liveness record, it indicates + // that we're heartbeating for the very first time. + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return errors.Wrap(err, "unable to get liveness") + } + + if kv.Value != nil { + // This is the happy path. Let's unpack the liveness record we found + // within KV, and use that to inform what our new liveness should + // be. + if err := kv.Value.GetProto(&oldLiveness); err != nil { + return errors.Wrap(err, "invalid liveness record") + } + + oldLivenessRec := LivenessRecord{ + Liveness: oldLiveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(oldLivenessRec) + + newLiveness = oldLiveness + } else { + // This is a "should basically never happen" scenario given our + // invariant around always persisting liveness records on node + // startup. But that was a change we added in 20.2. Though unlikely, + // it's possible to get into the following scenario: + // + // - v20.1 node gets added to v20.1 cluster, and is quickly removed + // before being able to persist its liveness record. + // - The cluster is upgraded to v20.2. + // - The node from earlier is rolled into v20.2, and re-added to the + // cluster. + // - It's never able to successfully heartbeat (it didn't join + // through the join rpc, bootstrap, or gossip). Welp. + // + // Given this possibility, we'll just fall back to creating the + // liveness record here as we did in v20.1 code. + // + // TODO(irfansharif): Remove this once v20.2 is cut. + log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) + newLiveness = kvserverpb.Liveness{ + NodeID: nodeID, + Epoch: 0, // incremented to epoch=1 below as needed + } } } + if incrementEpoch { + newLiveness.Epoch++ + newLiveness.Draining = false // clear draining field + } + // Grab a new clock reading to compute the new expiration time, // since we may have queued on the semaphore for a while. afterQueueTS := nl.clock.Now() diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index e628cca8e51b..4fc9aab9d70f 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -159,6 +159,49 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { verifyEpochIncremented(t, mtc, 0) } +// TestNodeLivenessAppearsAtStart tests that liveness records are written right +// when nodes are added to the cluster (during bootstrap, and when connecting to +// a bootstrapped node). The test verifies that the liveness records found are +// what we expect them to be. +func TestNodeLivenessAppearsAtStart(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + // At this point StartTestCluster has waited for all nodes to become live. + + // Verify liveness records exist for all nodes. + for i := 0; i < tc.NumServers(); i++ { + nodeID := tc.Server(i).NodeID() + nl := tc.Server(i).NodeLiveness().(*kvserver.NodeLiveness) + + if live, err := nl.IsLive(nodeID); err != nil { + t.Fatal(err) + } else if !live { + t.Fatalf("node %d not live", nodeID) + } + + livenessRec, err := nl.GetLiveness(nodeID) + if err != nil { + t.Fatal(err) + } + if livenessRec.NodeID != nodeID { + t.Fatalf("expected node ID %d, got %d", nodeID, livenessRec.NodeID) + } + // We expect epoch=1 as nodes first create a liveness record at epoch=0, + // and then increment it during their first heartbeat. + if livenessRec.Epoch != 1 { + t.Fatalf("expected epoch=1, got epoch=%d", livenessRec.Epoch) + } + if !livenessRec.Membership.Active() { + t.Fatalf("expected membership=active, got membership=%s", livenessRec.Membership) + } + } +} + func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) { testutils.SucceedsSoon(t, func() error { liveness, err := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_bootstrap.go index 3520985f3214..facd19a247aa 100644 --- a/pkg/kv/kvserver/store_bootstrap.go +++ b/pkg/kv/kvserver/store_bootstrap.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -96,16 +97,34 @@ func WriteInitialClusterData( roachpb.KeyValue{Key: keys.BootstrapVersionKey, Value: bootstrapVal}) // Initialize various sequence generators. - var nodeIDVal, storeIDVal, rangeIDVal roachpb.Value - nodeIDVal.SetInt(1) // This node has id 1. + var nodeIDVal, storeIDVal, rangeIDVal, livenessVal roachpb.Value + + const firstNodeID = 1 // This node has id 1. + nodeIDVal.SetInt(firstNodeID) // The caller will initialize the stores with ids 1..numStores. storeIDVal.SetInt(int64(numStores)) // The last range has id = len(splits) + 1 rangeIDVal.SetInt(int64(len(splits) + 1)) + + // We're the the first node in the cluster, let's seed our liveness record. + // It's crucial that we do to maintain the invariant that there's always a + // liveness record for a given node. We'll do something similar through the + // join RPC when adding new nodes to an already bootstrapped cluster [1]. + // + // We start off at epoch=0; when nodes heartbeat their liveness records for + // the first time it'll get incremented to epoch=1 [2]. + // + // [1]: See `CreateLivenessRecord` and usages for where that happens. + // [2]: See `StartHeartbeat` for where that happens. + livenessRecord := kvserverpb.Liveness{NodeID: 1, Epoch: 0} + if err := livenessVal.SetProto(&livenessRecord); err != nil { + return err + } initialValues = append(initialValues, roachpb.KeyValue{Key: keys.NodeIDGenerator, Value: nodeIDVal}, roachpb.KeyValue{Key: keys.StoreIDGenerator, Value: storeIDVal}, - roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal}) + roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal}, + roachpb.KeyValue{Key: keys.NodeLivenessKey(firstNodeID), Value: livenessVal}) // firstRangeMS is going to accumulate the stats for the first range, as we // write the meta records for all the other ranges. diff --git a/pkg/server/node.go b/pkg/server/node.go index 390ec8efffa8..6add213574fe 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -377,6 +377,12 @@ func (n *Node) start( log.Infof(ctxWithSpan, "new node allocated ID %d", newID) span.Finish() nodeID = newID + + // We're joining via gossip, so we don't have a liveness record for + // ourselves yet. Let's create one while here. + if err := n.storeCfg.NodeLiveness.CreateLivenessRecord(ctx, nodeID); err != nil { + return err + } } // Inform the RPC context of the node ID. @@ -1128,10 +1134,6 @@ func (n *Node) GossipSubscription( // Join implements the roachpb.InternalServer service. This is the // "connectivity" API; individual CRDB servers are passed in a --join list and // the join targets are addressed through this API. -// -// TODO(irfansharif): Perhaps we could opportunistically create a liveness -// record here so as to no longer have to worry about the liveness record not -// existing for a given node. func (n *Node) Join( ctx context.Context, req *roachpb.JoinNodeRequest, ) (*roachpb.JoinNodeResponse, error) { @@ -1153,6 +1155,18 @@ func (n *Node) Join( return nil, err } + // We create a liveness record here for the joining node while here. We do + // so to maintain the invariant that there's always a liveness record for a + // given node. See `WriteInitialClusterData` for the other codepath where we + // manually create a liveness record to maintain this same invariant. + // + // NB: This invariant will be required for when we introduce long running + // migrations. See https://github.com/cockroachdb/cockroach/pull/48843 for + // details. + if err := n.storeCfg.NodeLiveness.CreateLivenessRecord(ctx, nodeID); err != nil { + return nil, err + } + log.Infof(ctx, "allocated IDs: n%d, s%d", nodeID, storeID) return &roachpb.JoinNodeResponse{ diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 3bff2249cf80..5452a2e59447 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -77,6 +77,7 @@ func TestBootstrapCluster(t *testing.T) { for _, kv := range res.KVs { foundKeys = append(foundKeys, kv.Key) } + const firstNodeID = 1 var expectedKeys = keySlice{ testutils.MakeKey(roachpb.Key("\x02"), roachpb.KeyMax), testutils.MakeKey(roachpb.Key("\x03"), roachpb.KeyMax), @@ -84,6 +85,7 @@ func TestBootstrapCluster(t *testing.T) { roachpb.Key("\x04node-idgen"), roachpb.Key("\x04range-idgen"), roachpb.Key("\x04store-idgen"), + keys.NodeLivenessKey(firstNodeID), } for _, splitKey := range config.StaticSplits() { meta2Key := keys.RangeMetaKey(splitKey) diff --git a/pkg/server/server.go b/pkg/server/server.go index 40aac91500f9..1bcabf8bbbc7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1401,11 +1401,9 @@ func (s *Server) Start(ctx context.Context) error { // one, make sure it's the clusterID we already know (and are guaranteed to // know) at this point. If it's not the same, explode. // - // TODO(tbg): remove this when we have changed ServeAndWait() to join an - // existing cluster via a one-off RPC, at which point we can create gossip - // (and thus the RPC layer) only after the clusterID is already known. We - // can then rely on the RPC layer's protection against cross-cluster - // communication. + // TODO(irfansharif): The above is no longer applicable; in 21.1 we can + // always assume that the RPC layer will always get set up after having + // found out what the cluster ID is. The checks below can be removed then. { // We populated this above, so it should still be set. This is just to // demonstrate that we're not doing anything functional here (and to diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 3d41f08ac934..84c57e2dc0cf 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -342,6 +342,15 @@ func (ts *TestServer) MigrationManager() interface{} { return nil } +// NodeLiveness exposes the NodeLiveness instance used by the TestServer as an +// interface{}. +func (ts *TestServer) NodeLiveness() interface{} { + if ts != nil { + return ts.nodeLiveness + } + return nil +} + // RPCContext returns the rpc context used by the TestServer. func (ts *TestServer) RPCContext() *rpc.Context { if ts != nil { diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index df1e108a9f10..258139bf321f 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -221,6 +221,13 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto t.Fatalf("unable to start local test cluster: %s", err) } + // The heartbeat loop depends on gossip to retrieve the node ID, so we're + // sure to set it first. + nc.Set(ctx, nodeDesc.NodeID) + if err := ltc.Gossip.SetNodeDescriptor(nodeDesc); err != nil { + t.Fatalf("unable to set node descriptor: %s", err) + } + if !ltc.DisableLivenessHeartbeat { cfg.NodeLiveness.StartHeartbeat(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) } @@ -230,10 +237,6 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } ltc.Stores.AddStore(ltc.Store) - nc.Set(ctx, nodeDesc.NodeID) - if err := ltc.Gossip.SetNodeDescriptor(nodeDesc); err != nil { - t.Fatalf("unable to set node descriptor: %s", err) - } ltc.Cfg = cfg } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index fe5108e9bf35..39edd305316d 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -110,6 +110,10 @@ type TestServerInterface interface { // MigrationManager returns the *jobs.Registry as an interface{}. MigrationManager() interface{} + // NodeLiveness exposes the NodeLiveness instance used by the TestServer as an + // interface{}. + NodeLiveness() interface{} + // SetDistSQLSpanResolver changes the SpanResolver used for DistSQL inside the // server's executor. The argument must be a physicalplan.SpanResolver // instance.