From 5bcb69a246dea344af40d112569a899d34bdf6fd Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Oct 2020 11:49:45 +0200 Subject: [PATCH 1/3] keys: introduce NodeTombstoneKey These keys live in the store-local keyspace and contain nodes known to have been marked as decommissioned (=permanently removed from the cluster). These keys will be used to prevent tombstoned nodes from establishing RPC connections to the cluster (and vice versa). Release note: None --- pkg/keys/constants.go | 3 +++ pkg/keys/keys.go | 21 +++++++++++++++++++++ pkg/keys/printer.go | 17 +++++++++++++++-- pkg/keys/printer_test.go | 3 ++- 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 037381b014d8..528fc725217d 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -175,6 +175,9 @@ var ( // localStoreIdentSuffix stores an immutable identifier for this // store, created when the store is first bootstrapped. localStoreIdentSuffix = []byte("iden") + // localStoreNodeTombstoneSuffix stores key value pairs that map + // nodeIDs to time of removal from cluster. + localStoreNodeTombstoneSuffix = []byte("ntmb") // localStoreLastUpSuffix stores the last timestamp that a store's node // acknowledged that it was still running. This value will be regularly // refreshed on all stores for a running node; the intention of this value diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 5948d4b26050..27b4efbc66a1 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -76,6 +76,27 @@ func StoreHLCUpperBoundKey() roachpb.Key { return MakeStoreKey(localStoreHLCUpperBoundSuffix, nil) } +// StoreNodeTombstoneKey returns the key for storing a node tombstone for nodeID. +func StoreNodeTombstoneKey(nodeID roachpb.NodeID) roachpb.Key { + return MakeStoreKey(localStoreNodeTombstoneSuffix, encoding.EncodeUint32Ascending(nil, uint32(nodeID))) +} + +// DecodeNodeTombstoneKey returns the NodeID for the node tombstone. +func DecodeNodeTombstoneKey(key roachpb.Key) (roachpb.NodeID, error) { + suffix, detail, err := DecodeStoreKey(key) + if err != nil { + return 0, err + } + if !suffix.Equal(localStoreNodeTombstoneSuffix) { + return 0, errors.Errorf("key with suffix %q != %q", suffix, localStoreNodeTombstoneSuffix) + } + detail, nodeID, err := encoding.DecodeUint32Ascending(detail) + if len(detail) != 0 { + return 0, errors.Errorf("invalid key has trailing garbage: %q", detail) + } + return roachpb.NodeID(nodeID), err +} + // StoreSuggestedCompactionKey returns a store-local key for a // suggested compaction. It combines the specified start and end keys. func StoreSuggestedCompactionKey(start, end roachpb.Key) roachpb.Key { diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 0725897e2b22..22ae1cb78997 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -189,6 +189,7 @@ var constSubKeyDict = []struct { {"/storeIdent", localStoreIdentSuffix}, {"/gossipBootstrap", localStoreGossipSuffix}, {"/clusterVersion", localStoreClusterVersionSuffix}, + {"/nodeTombstone", localStoreNodeTombstoneSuffix}, {"/suggestedCompaction", localStoreSuggestedCompactionSuffix}, } @@ -200,6 +201,14 @@ func suggestedCompactionKeyPrint(key roachpb.Key) string { return fmt.Sprintf("{%s-%s}", start, end) } +func nodeTombstoneKeyPrint(key roachpb.Key) string { + nodeID, err := DecodeNodeTombstoneKey(key) + if err != nil { + return fmt.Sprintf("", err) + } + return fmt.Sprint(nodeID) +} + func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { for _, v := range constSubKeyDict { if bytes.HasPrefix(key, v.key) { @@ -207,6 +216,10 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { return v.name + "/" + suggestedCompactionKeyPrint( append(roachpb.Key(nil), append(localStorePrefix, key...)...), ) + } else if v.key.Equal(localStoreNodeTombstoneSuffix) { + return v.name + "/" + nodeTombstoneKeyPrint( + append(roachpb.Key(nil), append(localStorePrefix, key...)...), + ) } return v.name } @@ -218,8 +231,8 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { func localStoreKeyParse(input string) (remainder string, output roachpb.Key) { for _, s := range constSubKeyDict { if strings.HasPrefix(input, s.name) { - if s.key.Equal(localStoreSuggestedCompactionSuffix) { - panic(&ErrUglifyUnsupported{errors.New("cannot parse suggested compaction key")}) + if s.key.Equal(localStoreSuggestedCompactionSuffix) || s.key.Equal(localStoreNodeTombstoneSuffix) { + panic(&ErrUglifyUnsupported{errors.Errorf("cannot parse local store key with suffix %s", s.key)}) } output = MakeStoreKey(s.key, nil) return diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 0d7a13a3729c..990c7bf2d6ee 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -59,6 +59,7 @@ func TestPrettyPrint(t *testing.T) { {keys.StoreIdentKey(), "/Local/Store/storeIdent", revertSupportUnknown}, {keys.StoreGossipKey(), "/Local/Store/gossipBootstrap", revertSupportUnknown}, {keys.StoreClusterVersionKey(), "/Local/Store/clusterVersion", revertSupportUnknown}, + {keys.StoreNodeTombstoneKey(123), "/Local/Store/nodeTombstone/123", revertSupportUnknown}, {keys.StoreSuggestedCompactionKey(keys.MinKey, roachpb.Key("b")), `/Local/Store/suggestedCompaction/{/Min-"b"}`, revertSupportUnknown}, {keys.StoreSuggestedCompactionKey(roachpb.Key("a"), roachpb.Key("b")), `/Local/Store/suggestedCompaction/{"a"-"b"}`, revertSupportUnknown}, {keys.StoreSuggestedCompactionKey(roachpb.Key("a"), keys.MaxKey), `/Local/Store/suggestedCompaction/{"a"-/Max}`, revertSupportUnknown}, @@ -314,7 +315,7 @@ exp: %s t.Errorf("%d: ugly print expected unexpectedly unsupported (%s)", i, test.exp) } } else if exp, act := test.key, parsed; !bytes.Equal(exp, act) { - t.Errorf("%d: ugly print expected %q, got %q", i, exp, act) + t.Errorf("%d: ugly print expected '%q', got '%q'", i, exp, act) } if t.Failed() { return From 3cde9d5990bc816cace81c58674061265fcf3d47 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Oct 2020 12:09:08 +0200 Subject: [PATCH 2/3] server: introduce node tombstone storage This introduces the component that provides a cached view of the locally persisted node tombstones. This component is populated via NodeLiveness (which in turn listens to Gossip) and queried in the RPC layer, meaning that in effect, after the gossip info has propagated out, nodes will, within a heartbeat timeout (a few seconds) cease communication with the decommissioned node. I verified manually that the refusal message makes it to the logs on the decommissioned node. Some changes to the RPC connection pool were necessary to get the desired behavior. Essentially, the pool holds on to and hands out existing connections by default, meaning that the injected errors would be ignored if they started occurring on an already active connection (as in `TestDecommissionedNodeCannotConnect`). We need to make sure the heartbeat loop actually *returns* these injected errors, which is now the case. Release note: None --- pkg/rpc/context.go | 25 +++- pkg/server/connectivity_test.go | 58 ++++++++++ pkg/server/node_tombstone_storage.go | 132 ++++++++++++++++++++++ pkg/server/node_tombstone_storage_test.go | 81 +++++++++++++ pkg/server/server.go | 42 +++++-- 5 files changed, 327 insertions(+), 11 deletions(-) create mode 100644 pkg/server/node_tombstone_storage.go create mode 100644 pkg/server/node_tombstone_storage_test.go diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 9bfbd132532e..2cf5a4b8a676 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -45,10 +45,12 @@ import ( "golang.org/x/sync/syncmap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" encodingproto "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/metadata" + grpcstatus "google.golang.org/grpc/status" ) func init() { @@ -1131,6 +1133,13 @@ func (ctx *Context) runHeartbeat( // Give the first iteration a wait-free heartbeat attempt. heartbeatTimer.Reset(0) everSucceeded := false + // Both transient and permanent errors can arise here. Transient errors + // set the `heartbeatResult.err` field but retain the connection. + // Permanent errors return an error from this method, which means that + // the connection will be removed. Errors are presumed transient by + // default, but some - like ClusterID or version mismatches, as well as + // PermissionDenied errors injected by OnSendPing, are considered permanent. + returnErr := false for { select { case <-redialChan: @@ -1164,6 +1173,7 @@ func (ctx *Context) runHeartbeat( // NB: We want the request to fail-fast (the default), otherwise we won't // be notified of transport failures. if err := interceptor(request); err != nil { + returnErr = true return err } var err error @@ -1177,9 +1187,13 @@ func (ctx *Context) runHeartbeat( err = ping(goCtx) } + if s, ok := grpcstatus.FromError(errors.UnwrapAll(err)); ok && s.Code() == codes.PermissionDenied { + returnErr = true + } + if err == nil { // We verify the cluster name on the initiator side (instead - // of the hearbeat service side, as done for the cluster ID + // of the heartbeat service side, as done for the cluster ID // and node ID checks) so that the operator who is starting a // new node in a cluster and mistakenly joins the wrong // cluster gets a chance to see the error message on their @@ -1188,6 +1202,9 @@ func (ctx *Context) runHeartbeat( err = errors.Wrap( checkClusterName(ctx.Config.ClusterName, response.ClusterName), "cluster name check failed on ping response") + if err != nil { + returnErr = true + } } } @@ -1195,6 +1212,9 @@ func (ctx *Context) runHeartbeat( err = errors.Wrap( checkVersion(goCtx, ctx.Settings, response.ServerVersion), "version compatibility check failed on ping response") + if err != nil { + returnErr = true + } } if err == nil { @@ -1232,6 +1252,9 @@ func (ctx *Context) runHeartbeat( state = updateHeartbeatState(&ctx.metrics, state, hr.state()) conn.heartbeatResult.Store(hr) setInitialHeartbeatDone() + if returnErr { + return err + } return nil }); err != nil { return err diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go index dc6fbe12fbf4..f8ecd9296508 100644 --- a/pkg/server/connectivity_test.go +++ b/pkg/server/connectivity_test.go @@ -20,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -29,7 +31,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" ) // TestClusterConnectivity sets up an uninitialized cluster with custom join @@ -332,3 +337,56 @@ func TestJoinVersionGate(t *testing.T) { t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error()) } } + +func TestDecommissionedNodeCannotConnect(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + numNodes := 3 + tcArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // saves time + } + + tc := testcluster.StartTestCluster(t, numNodes, tcArgs) + defer tc.Stopper().Stop(ctx) + + for _, status := range []kvserverpb.MembershipStatus{ + kvserverpb.MembershipStatus_DECOMMISSIONING, kvserverpb.MembershipStatus_DECOMMISSIONED, + } { + require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3})) + } + + testutils.SucceedsSoon(t, func() error { + for _, idx := range []int{0, 1} { + clusterSrv := tc.Server(idx) + decomSrv := tc.Server(2) + + // Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3. + { + _, err := clusterSrv.RPCContext().GRPCDialNode( + decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass, + ).Connect(ctx) + cause := errors.UnwrapAll(err) + s, ok := grpcstatus.FromError(cause) + if !ok || s.Code() != codes.PermissionDenied { + return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err) + } + } + + // And similarly, n3 will be refused by n1, n2. + { + _, err := decomSrv.RPCContext().GRPCDialNode( + clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass, + ).Connect(ctx) + cause := errors.UnwrapAll(err) + s, ok := grpcstatus.FromError(cause) + if !ok || s.Code() != codes.PermissionDenied { + return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err) + } + } + } + return nil + }) +} diff --git a/pkg/server/node_tombstone_storage.go b/pkg/server/node_tombstone_storage.go new file mode 100644 index 000000000000..2513731d83c8 --- /dev/null +++ b/pkg/server/node_tombstone_storage.go @@ -0,0 +1,132 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// nodeTombstoneStorage maintains a local (i.e. unreplicated) +// registry of which nodes were permanently removed from the cluster. +type nodeTombstoneStorage struct { + engs []storage.Engine + mu struct { + syncutil.RWMutex + // cache contains both positive and negative hits. Positive hits + // unconditionally override negative hits. + cache map[roachpb.NodeID]time.Time + } +} + +func (s *nodeTombstoneStorage) key(nodeID roachpb.NodeID) roachpb.Key { + return keys.StoreNodeTombstoneKey(nodeID) +} + +// IsDecommissioned returns when (in UTC) a node was decommissioned +// (i.e. was permanently removed from the cluster). If not removed, +// returns the zero time. +// +// Errors are not returned during normal operation. +func (s *nodeTombstoneStorage) IsDecommissioned( + ctx context.Context, nodeID roachpb.NodeID, +) (time.Time, error) { + s.mu.RLock() + ts, ok := s.mu.cache[nodeID] + s.mu.RUnlock() + if ok { + // Cache hit. + return ts, nil + } + + // No cache hit. + k := s.key(nodeID) + for _, eng := range s.engs { + v, _, err := storage.MVCCGet(ctx, eng, k, hlc.Timestamp{}, storage.MVCCGetOptions{}) + if err != nil { + return time.Time{}, err + } + if v == nil { + // Not found. + continue + } + var tsp hlc.Timestamp + if err := v.GetProto(&tsp); err != nil { + return time.Time{}, err + } + // Found, offer to cache and return. + ts := timeutil.Unix(0, tsp.WallTime).UTC() + s.maybeAddCached(nodeID, ts) + return ts, nil + } + // Not found, add a negative cache hit. + s.maybeAddCached(nodeID, time.Time{}) + return time.Time{}, nil +} + +func (s *nodeTombstoneStorage) maybeAddCached(nodeID roachpb.NodeID, ts time.Time) (updated bool) { + s.mu.Lock() + defer s.mu.Unlock() + if oldTS, ok := s.mu.cache[nodeID]; !ok || oldTS.IsZero() { + if s.mu.cache == nil { + s.mu.cache = map[roachpb.NodeID]time.Time{} + } + s.mu.cache[nodeID] = ts + return true // updated + } + return false // not updated +} + +// SetDecommissioned marks a node as permanently removed +// from the cluster at the supplied approximate timestamp. +// +// Once a node is recorded as permanently removed, future +// invocations are ignored. +// +// Errors are not returned during normal operation. +func (s *nodeTombstoneStorage) SetDecommissioned( + ctx context.Context, nodeID roachpb.NodeID, ts time.Time, +) error { + if len(s.engs) == 0 { + return errors.New("no engines configured for nodeTombstoneStorage") + } + if ts.IsZero() { + return errors.New("can't mark as decommissioned at timestamp zero") + } + if !s.maybeAddCached(nodeID, ts.UTC()) { + // The cache already had a positive hit for this node, so don't do anything. + return nil + } + + // We've populated the cache, now write through to disk. + k := s.key(nodeID) + for _, eng := range s.engs { + var v roachpb.Value + if err := v.SetProto(&hlc.Timestamp{WallTime: ts.UnixNano()}); err != nil { + return err + } + + if err := storage.MVCCPut( + ctx, eng, nil /* MVCCStats */, k, hlc.Timestamp{}, v, nil, /* txn */ + ); err != nil { + return err + } + } + return nil +} diff --git a/pkg/server/node_tombstone_storage_test.go b/pkg/server/node_tombstone_storage_test.go new file mode 100644 index 000000000000..1675f5618fcc --- /dev/null +++ b/pkg/server/node_tombstone_storage_test.go @@ -0,0 +1,81 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestNodeTombstoneStorage(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + eng1 := storage.NewDefaultInMem() + defer eng1.Close() + eng2 := storage.NewDefaultInMem() + defer eng2.Close() + + mustTime := func(ts time.Time, err error) time.Time { + t.Helper() + require.NoError(t, err) + return ts + } + + s := &nodeTombstoneStorage{engs: []storage.Engine{eng1, eng2}} + // Empty storage has nobody decommissioned. + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 1))) + + // Decommission n2 at ts1. + ts1 := timeutil.Unix(10, 0).UTC() + require.NoError(t, s.SetDecommissioned(ctx, 2, ts1)) + // n1 is still active. + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 1))) + // n2 is decommissioned. + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + // Decommission n2 again, at older timestamp. + require.NoError(t, s.SetDecommissioned(ctx, 2, ts1.Add(-time.Second))) + // n2 is still decommissioned at ts1. + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + // Decommission n2 again, at newer timestamp. + require.NoError(t, s.SetDecommissioned(ctx, 2, ts1.Add(time.Second))) + // n2 is still decommissioned at ts1. + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + + // Also decommission n1. + ts2 := timeutil.Unix(5, 0).UTC() + require.NoError(t, s.SetDecommissioned(ctx, 1, ts2)) + // n1 is decommissioned at ts2. + require.Equal(t, ts2, mustTime(s.IsDecommissioned(ctx, 1))) + + // n3 is not decommissioned. + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 3))) + + // We're not hitting the disks any more; the decommissioned + // status is cached. This includes both the decommissioned nodes + // and n3, which is not decommissioned but was checked above. + engs := s.engs + s.engs = nil + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + require.Equal(t, ts2, mustTime(s.IsDecommissioned(ctx, 1))) + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 3))) + + // If we recreate the cache, it rehydrates from disk. + s = &nodeTombstoneStorage{engs: engs} + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + require.Equal(t, ts2, mustTime(s.IsDecommissioned(ctx, 1))) + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 3))) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index bb9e002c2aa4..919cbcb594c0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -257,6 +257,31 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ctx := cfg.AmbientCtx.AnnotateCtx(context.Background()) + engines, err := cfg.CreateEngines(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to create engines") + } + stopper.AddCloser(&engines) + + nodeTombStorage := &nodeTombstoneStorage{engs: engines} + checkPingFor := func(ctx context.Context, nodeID roachpb.NodeID) error { + ts, err := nodeTombStorage.IsDecommissioned(ctx, nodeID) + if err != nil { + // An error here means something very basic is not working. Better to terminate + // than to limp along. + log.Fatalf(ctx, "unable to read decommissioned status for n%d: %v", nodeID, err) + } + if !ts.IsZero() { + // The node was decommissioned. + return grpcstatus.Errorf(codes.PermissionDenied, + "n%d was permanently removed from the cluster at %s; it is not allowed to rejoin the cluster", + nodeID, ts, + ) + } + // The common case - target node is not decommissioned. + return nil + } + rpcCtxOpts := rpc.ContextOptions{ TenantID: roachpb.SystemTenantID, AmbientCtx: cfg.AmbientCtx, @@ -265,12 +290,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { Stopper: stopper, Settings: cfg.Settings, OnSendPing: func(req *rpc.PingRequest) error { - // TODO(tbg): hook this up to a check for decommissioned nodes. - return nil + return checkPingFor(ctx, req.TargetNodeID) }, OnHandlePing: func(req *rpc.PingRequest) error { - // TODO(tbg): hook this up to a check for decommissioned nodes. - return nil + return checkPingFor(ctx, req.OriginNodeID) }} if knobs := cfg.TestingKnobs.Server; knobs != nil { serverKnobs := knobs.(*TestingKnobs) @@ -398,6 +421,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil { knobs.OnDecommissionedCallback(liveness) } + if err := nodeTombStorage.SetDecommissioned( + ctx, liveness.NodeID, timeutil.Unix(0, liveness.Expiration.WallTime).UTC(), + ); err != nil { + log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err) + } }, }) registry.AddMetricStruct(nodeLiveness.Metrics()) @@ -608,12 +636,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { debugServer := debug.NewServer(st, sqlServer.pgServer.HBADebugFn()) node.InitLogger(sqlServer.execCfg) - engines, err := cfg.CreateEngines(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to create engines") - } - stopper.AddCloser(&engines) - *lateBoundServer = Server{ nodeIDContainer: nodeIDContainer, cfg: cfg, From 40cdd5aecafb18c0a39f6892ac102ad150355882 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 14 Oct 2020 11:12:54 +0200 Subject: [PATCH 3/3] rpc: rename ping interceptors for clarity Release note: None --- pkg/rpc/context.go | 20 ++++++++++---------- pkg/rpc/context_test.go | 6 +++--- pkg/rpc/heartbeat.go | 2 +- pkg/server/server.go | 4 ++-- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 2cf5a4b8a676..f7c727488dba 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -378,15 +378,15 @@ type ContextOptions struct { Clock *hlc.Clock Stopper *stop.Stopper Settings *cluster.Settings - // OnHandlePing is called when handling a PingRequest, after + // OnIncomingPing is called when handling a PingRequest, after // preliminary checks but before recording clock offset information. // // It can inject an error. - OnHandlePing func(*PingRequest) error - // OnSendPing intercepts outgoing PingRequests. It may inject an + OnIncomingPing func(*PingRequest) error + // OnOutgoingPing intercepts outgoing PingRequests. It may inject an // error. - OnSendPing func(*PingRequest) error - Knobs ContextTestingKnobs + OnOutgoingPing func(*PingRequest) error + Knobs ContextTestingKnobs } func (c ContextOptions) validate() error { @@ -406,9 +406,9 @@ func (c ContextOptions) validate() error { return errors.New("Settings must be set") } - // NB: OnSendPing and OnHandlePing default to noops. + // NB: OnOutgoingPing and OnIncomingPing default to noops. // This is used both for testing and the cli. - _, _ = c.OnSendPing, c.OnHandlePing + _, _ = c.OnOutgoingPing, c.OnIncomingPing return nil } @@ -1138,7 +1138,7 @@ func (ctx *Context) runHeartbeat( // Permanent errors return an error from this method, which means that // the connection will be removed. Errors are presumed transient by // default, but some - like ClusterID or version mismatches, as well as - // PermissionDenied errors injected by OnSendPing, are considered permanent. + // PermissionDenied errors injected by OnOutgoingPing, are considered permanent. returnErr := false for { select { @@ -1163,7 +1163,7 @@ func (ctx *Context) runHeartbeat( } interceptor := func(*PingRequest) error { return nil } - if fn := ctx.OnSendPing; fn != nil { + if fn := ctx.OnOutgoingPing; fn != nil { interceptor = fn } @@ -1274,7 +1274,7 @@ func (ctx *Context) NewHeartbeatService() *HeartbeatService { clusterID: &ctx.ClusterID, nodeID: &ctx.NodeID, settings: ctx.Settings, - onHandlePing: ctx.OnHandlePing, + onHandlePing: ctx.OnIncomingPing, testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer, } } diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 3069e47c5f1f..c11dfd467e4a 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -162,7 +162,7 @@ func TestHeartbeatCB(t *testing.T) { }) } -// TestPingInterceptors checks that OnSendPing and OnHandlePing can inject errors. +// TestPingInterceptors checks that OnOutgoingPing and OnIncomingPing can inject errors. func TestPingInterceptors(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -181,13 +181,13 @@ func TestPingInterceptors(t *testing.T) { Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond), Stopper: stop.NewStopper(), Settings: cluster.MakeTestingClusterSettings(), - OnSendPing: func(req *PingRequest) error { + OnOutgoingPing: func(req *PingRequest) error { if req.TargetNodeID == blockedTargetNodeID { return errBoomSend } return nil }, - OnHandlePing: func(req *PingRequest) error { + OnIncomingPing: func(req *PingRequest) error { if req.OriginNodeID == blockedOriginNodeID { return errBoomRecv } diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 10df41746f8f..f0540bc31fb3 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -53,7 +53,7 @@ type HeartbeatService struct { clusterName string disableClusterNameVerification bool - onHandlePing func(*PingRequest) error // see ContextOptions.OnHandlePing + onHandlePing func(*PingRequest) error // see ContextOptions.OnIncomingPing // TestingAllowNamedRPCToAnonymousServer, when defined (in tests), // disables errors in case a heartbeat requests a specific node ID but diff --git a/pkg/server/server.go b/pkg/server/server.go index 919cbcb594c0..cdf55a145e7c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -289,10 +289,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { Clock: clock, Stopper: stopper, Settings: cfg.Settings, - OnSendPing: func(req *rpc.PingRequest) error { + OnOutgoingPing: func(req *rpc.PingRequest) error { return checkPingFor(ctx, req.TargetNodeID) }, - OnHandlePing: func(req *rpc.PingRequest) error { + OnIncomingPing: func(req *rpc.PingRequest) error { return checkPingFor(ctx, req.OriginNodeID) }} if knobs := cfg.TestingKnobs.Server; knobs != nil {