diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 037381b014d8..528fc725217d 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -175,6 +175,9 @@ var ( // localStoreIdentSuffix stores an immutable identifier for this // store, created when the store is first bootstrapped. localStoreIdentSuffix = []byte("iden") + // localStoreNodeTombstoneSuffix stores key value pairs that map + // nodeIDs to time of removal from cluster. + localStoreNodeTombstoneSuffix = []byte("ntmb") // localStoreLastUpSuffix stores the last timestamp that a store's node // acknowledged that it was still running. This value will be regularly // refreshed on all stores for a running node; the intention of this value diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 5948d4b26050..27b4efbc66a1 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -76,6 +76,27 @@ func StoreHLCUpperBoundKey() roachpb.Key { return MakeStoreKey(localStoreHLCUpperBoundSuffix, nil) } +// StoreNodeTombstoneKey returns the key for storing a node tombstone for nodeID. +func StoreNodeTombstoneKey(nodeID roachpb.NodeID) roachpb.Key { + return MakeStoreKey(localStoreNodeTombstoneSuffix, encoding.EncodeUint32Ascending(nil, uint32(nodeID))) +} + +// DecodeNodeTombstoneKey returns the NodeID for the node tombstone. +func DecodeNodeTombstoneKey(key roachpb.Key) (roachpb.NodeID, error) { + suffix, detail, err := DecodeStoreKey(key) + if err != nil { + return 0, err + } + if !suffix.Equal(localStoreNodeTombstoneSuffix) { + return 0, errors.Errorf("key with suffix %q != %q", suffix, localStoreNodeTombstoneSuffix) + } + detail, nodeID, err := encoding.DecodeUint32Ascending(detail) + if len(detail) != 0 { + return 0, errors.Errorf("invalid key has trailing garbage: %q", detail) + } + return roachpb.NodeID(nodeID), err +} + // StoreSuggestedCompactionKey returns a store-local key for a // suggested compaction. It combines the specified start and end keys. func StoreSuggestedCompactionKey(start, end roachpb.Key) roachpb.Key { diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 0725897e2b22..22ae1cb78997 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -189,6 +189,7 @@ var constSubKeyDict = []struct { {"/storeIdent", localStoreIdentSuffix}, {"/gossipBootstrap", localStoreGossipSuffix}, {"/clusterVersion", localStoreClusterVersionSuffix}, + {"/nodeTombstone", localStoreNodeTombstoneSuffix}, {"/suggestedCompaction", localStoreSuggestedCompactionSuffix}, } @@ -200,6 +201,14 @@ func suggestedCompactionKeyPrint(key roachpb.Key) string { return fmt.Sprintf("{%s-%s}", start, end) } +func nodeTombstoneKeyPrint(key roachpb.Key) string { + nodeID, err := DecodeNodeTombstoneKey(key) + if err != nil { + return fmt.Sprintf("", err) + } + return fmt.Sprint(nodeID) +} + func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { for _, v := range constSubKeyDict { if bytes.HasPrefix(key, v.key) { @@ -207,6 +216,10 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { return v.name + "/" + suggestedCompactionKeyPrint( append(roachpb.Key(nil), append(localStorePrefix, key...)...), ) + } else if v.key.Equal(localStoreNodeTombstoneSuffix) { + return v.name + "/" + nodeTombstoneKeyPrint( + append(roachpb.Key(nil), append(localStorePrefix, key...)...), + ) } return v.name } @@ -218,8 +231,8 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { func localStoreKeyParse(input string) (remainder string, output roachpb.Key) { for _, s := range constSubKeyDict { if strings.HasPrefix(input, s.name) { - if s.key.Equal(localStoreSuggestedCompactionSuffix) { - panic(&ErrUglifyUnsupported{errors.New("cannot parse suggested compaction key")}) + if s.key.Equal(localStoreSuggestedCompactionSuffix) || s.key.Equal(localStoreNodeTombstoneSuffix) { + panic(&ErrUglifyUnsupported{errors.Errorf("cannot parse local store key with suffix %s", s.key)}) } output = MakeStoreKey(s.key, nil) return diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 0d7a13a3729c..990c7bf2d6ee 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -59,6 +59,7 @@ func TestPrettyPrint(t *testing.T) { {keys.StoreIdentKey(), "/Local/Store/storeIdent", revertSupportUnknown}, {keys.StoreGossipKey(), "/Local/Store/gossipBootstrap", revertSupportUnknown}, {keys.StoreClusterVersionKey(), "/Local/Store/clusterVersion", revertSupportUnknown}, + {keys.StoreNodeTombstoneKey(123), "/Local/Store/nodeTombstone/123", revertSupportUnknown}, {keys.StoreSuggestedCompactionKey(keys.MinKey, roachpb.Key("b")), `/Local/Store/suggestedCompaction/{/Min-"b"}`, revertSupportUnknown}, {keys.StoreSuggestedCompactionKey(roachpb.Key("a"), roachpb.Key("b")), `/Local/Store/suggestedCompaction/{"a"-"b"}`, revertSupportUnknown}, {keys.StoreSuggestedCompactionKey(roachpb.Key("a"), keys.MaxKey), `/Local/Store/suggestedCompaction/{"a"-/Max}`, revertSupportUnknown}, @@ -314,7 +315,7 @@ exp: %s t.Errorf("%d: ugly print expected unexpectedly unsupported (%s)", i, test.exp) } } else if exp, act := test.key, parsed; !bytes.Equal(exp, act) { - t.Errorf("%d: ugly print expected %q, got %q", i, exp, act) + t.Errorf("%d: ugly print expected '%q', got '%q'", i, exp, act) } if t.Failed() { return diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 9bfbd132532e..f7c727488dba 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -45,10 +45,12 @@ import ( "golang.org/x/sync/syncmap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" encodingproto "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/metadata" + grpcstatus "google.golang.org/grpc/status" ) func init() { @@ -376,15 +378,15 @@ type ContextOptions struct { Clock *hlc.Clock Stopper *stop.Stopper Settings *cluster.Settings - // OnHandlePing is called when handling a PingRequest, after + // OnIncomingPing is called when handling a PingRequest, after // preliminary checks but before recording clock offset information. // // It can inject an error. - OnHandlePing func(*PingRequest) error - // OnSendPing intercepts outgoing PingRequests. It may inject an + OnIncomingPing func(*PingRequest) error + // OnOutgoingPing intercepts outgoing PingRequests. It may inject an // error. - OnSendPing func(*PingRequest) error - Knobs ContextTestingKnobs + OnOutgoingPing func(*PingRequest) error + Knobs ContextTestingKnobs } func (c ContextOptions) validate() error { @@ -404,9 +406,9 @@ func (c ContextOptions) validate() error { return errors.New("Settings must be set") } - // NB: OnSendPing and OnHandlePing default to noops. + // NB: OnOutgoingPing and OnIncomingPing default to noops. // This is used both for testing and the cli. - _, _ = c.OnSendPing, c.OnHandlePing + _, _ = c.OnOutgoingPing, c.OnIncomingPing return nil } @@ -1131,6 +1133,13 @@ func (ctx *Context) runHeartbeat( // Give the first iteration a wait-free heartbeat attempt. heartbeatTimer.Reset(0) everSucceeded := false + // Both transient and permanent errors can arise here. Transient errors + // set the `heartbeatResult.err` field but retain the connection. + // Permanent errors return an error from this method, which means that + // the connection will be removed. Errors are presumed transient by + // default, but some - like ClusterID or version mismatches, as well as + // PermissionDenied errors injected by OnOutgoingPing, are considered permanent. + returnErr := false for { select { case <-redialChan: @@ -1154,7 +1163,7 @@ func (ctx *Context) runHeartbeat( } interceptor := func(*PingRequest) error { return nil } - if fn := ctx.OnSendPing; fn != nil { + if fn := ctx.OnOutgoingPing; fn != nil { interceptor = fn } @@ -1164,6 +1173,7 @@ func (ctx *Context) runHeartbeat( // NB: We want the request to fail-fast (the default), otherwise we won't // be notified of transport failures. if err := interceptor(request); err != nil { + returnErr = true return err } var err error @@ -1177,9 +1187,13 @@ func (ctx *Context) runHeartbeat( err = ping(goCtx) } + if s, ok := grpcstatus.FromError(errors.UnwrapAll(err)); ok && s.Code() == codes.PermissionDenied { + returnErr = true + } + if err == nil { // We verify the cluster name on the initiator side (instead - // of the hearbeat service side, as done for the cluster ID + // of the heartbeat service side, as done for the cluster ID // and node ID checks) so that the operator who is starting a // new node in a cluster and mistakenly joins the wrong // cluster gets a chance to see the error message on their @@ -1188,6 +1202,9 @@ func (ctx *Context) runHeartbeat( err = errors.Wrap( checkClusterName(ctx.Config.ClusterName, response.ClusterName), "cluster name check failed on ping response") + if err != nil { + returnErr = true + } } } @@ -1195,6 +1212,9 @@ func (ctx *Context) runHeartbeat( err = errors.Wrap( checkVersion(goCtx, ctx.Settings, response.ServerVersion), "version compatibility check failed on ping response") + if err != nil { + returnErr = true + } } if err == nil { @@ -1232,6 +1252,9 @@ func (ctx *Context) runHeartbeat( state = updateHeartbeatState(&ctx.metrics, state, hr.state()) conn.heartbeatResult.Store(hr) setInitialHeartbeatDone() + if returnErr { + return err + } return nil }); err != nil { return err @@ -1251,7 +1274,7 @@ func (ctx *Context) NewHeartbeatService() *HeartbeatService { clusterID: &ctx.ClusterID, nodeID: &ctx.NodeID, settings: ctx.Settings, - onHandlePing: ctx.OnHandlePing, + onHandlePing: ctx.OnIncomingPing, testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer, } } diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 3069e47c5f1f..c11dfd467e4a 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -162,7 +162,7 @@ func TestHeartbeatCB(t *testing.T) { }) } -// TestPingInterceptors checks that OnSendPing and OnHandlePing can inject errors. +// TestPingInterceptors checks that OnOutgoingPing and OnIncomingPing can inject errors. func TestPingInterceptors(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -181,13 +181,13 @@ func TestPingInterceptors(t *testing.T) { Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond), Stopper: stop.NewStopper(), Settings: cluster.MakeTestingClusterSettings(), - OnSendPing: func(req *PingRequest) error { + OnOutgoingPing: func(req *PingRequest) error { if req.TargetNodeID == blockedTargetNodeID { return errBoomSend } return nil }, - OnHandlePing: func(req *PingRequest) error { + OnIncomingPing: func(req *PingRequest) error { if req.OriginNodeID == blockedOriginNodeID { return errBoomRecv } diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 10df41746f8f..f0540bc31fb3 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -53,7 +53,7 @@ type HeartbeatService struct { clusterName string disableClusterNameVerification bool - onHandlePing func(*PingRequest) error // see ContextOptions.OnHandlePing + onHandlePing func(*PingRequest) error // see ContextOptions.OnIncomingPing // TestingAllowNamedRPCToAnonymousServer, when defined (in tests), // disables errors in case a heartbeat requests a specific node ID but diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go index dc6fbe12fbf4..f8ecd9296508 100644 --- a/pkg/server/connectivity_test.go +++ b/pkg/server/connectivity_test.go @@ -20,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -29,7 +31,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" ) // TestClusterConnectivity sets up an uninitialized cluster with custom join @@ -332,3 +337,56 @@ func TestJoinVersionGate(t *testing.T) { t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error()) } } + +func TestDecommissionedNodeCannotConnect(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + numNodes := 3 + tcArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // saves time + } + + tc := testcluster.StartTestCluster(t, numNodes, tcArgs) + defer tc.Stopper().Stop(ctx) + + for _, status := range []kvserverpb.MembershipStatus{ + kvserverpb.MembershipStatus_DECOMMISSIONING, kvserverpb.MembershipStatus_DECOMMISSIONED, + } { + require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3})) + } + + testutils.SucceedsSoon(t, func() error { + for _, idx := range []int{0, 1} { + clusterSrv := tc.Server(idx) + decomSrv := tc.Server(2) + + // Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3. + { + _, err := clusterSrv.RPCContext().GRPCDialNode( + decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass, + ).Connect(ctx) + cause := errors.UnwrapAll(err) + s, ok := grpcstatus.FromError(cause) + if !ok || s.Code() != codes.PermissionDenied { + return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err) + } + } + + // And similarly, n3 will be refused by n1, n2. + { + _, err := decomSrv.RPCContext().GRPCDialNode( + clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass, + ).Connect(ctx) + cause := errors.UnwrapAll(err) + s, ok := grpcstatus.FromError(cause) + if !ok || s.Code() != codes.PermissionDenied { + return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err) + } + } + } + return nil + }) +} diff --git a/pkg/server/node_tombstone_storage.go b/pkg/server/node_tombstone_storage.go new file mode 100644 index 000000000000..2513731d83c8 --- /dev/null +++ b/pkg/server/node_tombstone_storage.go @@ -0,0 +1,132 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// nodeTombstoneStorage maintains a local (i.e. unreplicated) +// registry of which nodes were permanently removed from the cluster. +type nodeTombstoneStorage struct { + engs []storage.Engine + mu struct { + syncutil.RWMutex + // cache contains both positive and negative hits. Positive hits + // unconditionally override negative hits. + cache map[roachpb.NodeID]time.Time + } +} + +func (s *nodeTombstoneStorage) key(nodeID roachpb.NodeID) roachpb.Key { + return keys.StoreNodeTombstoneKey(nodeID) +} + +// IsDecommissioned returns when (in UTC) a node was decommissioned +// (i.e. was permanently removed from the cluster). If not removed, +// returns the zero time. +// +// Errors are not returned during normal operation. +func (s *nodeTombstoneStorage) IsDecommissioned( + ctx context.Context, nodeID roachpb.NodeID, +) (time.Time, error) { + s.mu.RLock() + ts, ok := s.mu.cache[nodeID] + s.mu.RUnlock() + if ok { + // Cache hit. + return ts, nil + } + + // No cache hit. + k := s.key(nodeID) + for _, eng := range s.engs { + v, _, err := storage.MVCCGet(ctx, eng, k, hlc.Timestamp{}, storage.MVCCGetOptions{}) + if err != nil { + return time.Time{}, err + } + if v == nil { + // Not found. + continue + } + var tsp hlc.Timestamp + if err := v.GetProto(&tsp); err != nil { + return time.Time{}, err + } + // Found, offer to cache and return. + ts := timeutil.Unix(0, tsp.WallTime).UTC() + s.maybeAddCached(nodeID, ts) + return ts, nil + } + // Not found, add a negative cache hit. + s.maybeAddCached(nodeID, time.Time{}) + return time.Time{}, nil +} + +func (s *nodeTombstoneStorage) maybeAddCached(nodeID roachpb.NodeID, ts time.Time) (updated bool) { + s.mu.Lock() + defer s.mu.Unlock() + if oldTS, ok := s.mu.cache[nodeID]; !ok || oldTS.IsZero() { + if s.mu.cache == nil { + s.mu.cache = map[roachpb.NodeID]time.Time{} + } + s.mu.cache[nodeID] = ts + return true // updated + } + return false // not updated +} + +// SetDecommissioned marks a node as permanently removed +// from the cluster at the supplied approximate timestamp. +// +// Once a node is recorded as permanently removed, future +// invocations are ignored. +// +// Errors are not returned during normal operation. +func (s *nodeTombstoneStorage) SetDecommissioned( + ctx context.Context, nodeID roachpb.NodeID, ts time.Time, +) error { + if len(s.engs) == 0 { + return errors.New("no engines configured for nodeTombstoneStorage") + } + if ts.IsZero() { + return errors.New("can't mark as decommissioned at timestamp zero") + } + if !s.maybeAddCached(nodeID, ts.UTC()) { + // The cache already had a positive hit for this node, so don't do anything. + return nil + } + + // We've populated the cache, now write through to disk. + k := s.key(nodeID) + for _, eng := range s.engs { + var v roachpb.Value + if err := v.SetProto(&hlc.Timestamp{WallTime: ts.UnixNano()}); err != nil { + return err + } + + if err := storage.MVCCPut( + ctx, eng, nil /* MVCCStats */, k, hlc.Timestamp{}, v, nil, /* txn */ + ); err != nil { + return err + } + } + return nil +} diff --git a/pkg/server/node_tombstone_storage_test.go b/pkg/server/node_tombstone_storage_test.go new file mode 100644 index 000000000000..1675f5618fcc --- /dev/null +++ b/pkg/server/node_tombstone_storage_test.go @@ -0,0 +1,81 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestNodeTombstoneStorage(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + eng1 := storage.NewDefaultInMem() + defer eng1.Close() + eng2 := storage.NewDefaultInMem() + defer eng2.Close() + + mustTime := func(ts time.Time, err error) time.Time { + t.Helper() + require.NoError(t, err) + return ts + } + + s := &nodeTombstoneStorage{engs: []storage.Engine{eng1, eng2}} + // Empty storage has nobody decommissioned. + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 1))) + + // Decommission n2 at ts1. + ts1 := timeutil.Unix(10, 0).UTC() + require.NoError(t, s.SetDecommissioned(ctx, 2, ts1)) + // n1 is still active. + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 1))) + // n2 is decommissioned. + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + // Decommission n2 again, at older timestamp. + require.NoError(t, s.SetDecommissioned(ctx, 2, ts1.Add(-time.Second))) + // n2 is still decommissioned at ts1. + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + // Decommission n2 again, at newer timestamp. + require.NoError(t, s.SetDecommissioned(ctx, 2, ts1.Add(time.Second))) + // n2 is still decommissioned at ts1. + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + + // Also decommission n1. + ts2 := timeutil.Unix(5, 0).UTC() + require.NoError(t, s.SetDecommissioned(ctx, 1, ts2)) + // n1 is decommissioned at ts2. + require.Equal(t, ts2, mustTime(s.IsDecommissioned(ctx, 1))) + + // n3 is not decommissioned. + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 3))) + + // We're not hitting the disks any more; the decommissioned + // status is cached. This includes both the decommissioned nodes + // and n3, which is not decommissioned but was checked above. + engs := s.engs + s.engs = nil + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + require.Equal(t, ts2, mustTime(s.IsDecommissioned(ctx, 1))) + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 3))) + + // If we recreate the cache, it rehydrates from disk. + s = &nodeTombstoneStorage{engs: engs} + require.Equal(t, ts1, mustTime(s.IsDecommissioned(ctx, 2))) + require.Equal(t, ts2, mustTime(s.IsDecommissioned(ctx, 1))) + require.Equal(t, time.Time{}, mustTime(s.IsDecommissioned(ctx, 3))) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index bb9e002c2aa4..cdf55a145e7c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -257,6 +257,31 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ctx := cfg.AmbientCtx.AnnotateCtx(context.Background()) + engines, err := cfg.CreateEngines(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to create engines") + } + stopper.AddCloser(&engines) + + nodeTombStorage := &nodeTombstoneStorage{engs: engines} + checkPingFor := func(ctx context.Context, nodeID roachpb.NodeID) error { + ts, err := nodeTombStorage.IsDecommissioned(ctx, nodeID) + if err != nil { + // An error here means something very basic is not working. Better to terminate + // than to limp along. + log.Fatalf(ctx, "unable to read decommissioned status for n%d: %v", nodeID, err) + } + if !ts.IsZero() { + // The node was decommissioned. + return grpcstatus.Errorf(codes.PermissionDenied, + "n%d was permanently removed from the cluster at %s; it is not allowed to rejoin the cluster", + nodeID, ts, + ) + } + // The common case - target node is not decommissioned. + return nil + } + rpcCtxOpts := rpc.ContextOptions{ TenantID: roachpb.SystemTenantID, AmbientCtx: cfg.AmbientCtx, @@ -264,13 +289,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { Clock: clock, Stopper: stopper, Settings: cfg.Settings, - OnSendPing: func(req *rpc.PingRequest) error { - // TODO(tbg): hook this up to a check for decommissioned nodes. - return nil + OnOutgoingPing: func(req *rpc.PingRequest) error { + return checkPingFor(ctx, req.TargetNodeID) }, - OnHandlePing: func(req *rpc.PingRequest) error { - // TODO(tbg): hook this up to a check for decommissioned nodes. - return nil + OnIncomingPing: func(req *rpc.PingRequest) error { + return checkPingFor(ctx, req.OriginNodeID) }} if knobs := cfg.TestingKnobs.Server; knobs != nil { serverKnobs := knobs.(*TestingKnobs) @@ -398,6 +421,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil { knobs.OnDecommissionedCallback(liveness) } + if err := nodeTombStorage.SetDecommissioned( + ctx, liveness.NodeID, timeutil.Unix(0, liveness.Expiration.WallTime).UTC(), + ); err != nil { + log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err) + } }, }) registry.AddMetricStruct(nodeLiveness.Metrics()) @@ -608,12 +636,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { debugServer := debug.NewServer(st, sqlServer.pgServer.HBADebugFn()) node.InitLogger(sqlServer.execCfg) - engines, err := cfg.CreateEngines(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to create engines") - } - stopper.AddCloser(&engines) - *lateBoundServer = Server{ nodeIDContainer: nodeIDContainer, cfg: cfg,