Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server,rpc: block decommissioned node from interacting with the cluster #55286

Merged
merged 3 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ var (
// localStoreIdentSuffix stores an immutable identifier for this
// store, created when the store is first bootstrapped.
localStoreIdentSuffix = []byte("iden")
// localStoreNodeTombstoneSuffix stores key value pairs that map
// nodeIDs to time of removal from cluster.
localStoreNodeTombstoneSuffix = []byte("ntmb")
// localStoreLastUpSuffix stores the last timestamp that a store's node
// acknowledged that it was still running. This value will be regularly
// refreshed on all stores for a running node; the intention of this value
Expand Down
21 changes: 21 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ func StoreHLCUpperBoundKey() roachpb.Key {
return MakeStoreKey(localStoreHLCUpperBoundSuffix, nil)
}

// StoreNodeTombstoneKey returns the key for storing a node tombstone for nodeID.
func StoreNodeTombstoneKey(nodeID roachpb.NodeID) roachpb.Key {
return MakeStoreKey(localStoreNodeTombstoneSuffix, encoding.EncodeUint32Ascending(nil, uint32(nodeID)))
}

// DecodeNodeTombstoneKey returns the NodeID for the node tombstone.
func DecodeNodeTombstoneKey(key roachpb.Key) (roachpb.NodeID, error) {
suffix, detail, err := DecodeStoreKey(key)
if err != nil {
return 0, err
}
if !suffix.Equal(localStoreNodeTombstoneSuffix) {
return 0, errors.Errorf("key with suffix %q != %q", suffix, localStoreNodeTombstoneSuffix)
}
detail, nodeID, err := encoding.DecodeUint32Ascending(detail)
if len(detail) != 0 {
return 0, errors.Errorf("invalid key has trailing garbage: %q", detail)
}
return roachpb.NodeID(nodeID), err
}

// StoreSuggestedCompactionKey returns a store-local key for a
// suggested compaction. It combines the specified start and end keys.
func StoreSuggestedCompactionKey(start, end roachpb.Key) roachpb.Key {
Expand Down
17 changes: 15 additions & 2 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var constSubKeyDict = []struct {
{"/storeIdent", localStoreIdentSuffix},
{"/gossipBootstrap", localStoreGossipSuffix},
{"/clusterVersion", localStoreClusterVersionSuffix},
{"/nodeTombstone", localStoreNodeTombstoneSuffix},
{"/suggestedCompaction", localStoreSuggestedCompactionSuffix},
}

Expand All @@ -200,13 +201,25 @@ func suggestedCompactionKeyPrint(key roachpb.Key) string {
return fmt.Sprintf("{%s-%s}", start, end)
}

func nodeTombstoneKeyPrint(key roachpb.Key) string {
nodeID, err := DecodeNodeTombstoneKey(key)
if err != nil {
return fmt.Sprintf("<invalid: %s>", err)
}
return fmt.Sprint(nodeID)
}

func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string {
for _, v := range constSubKeyDict {
if bytes.HasPrefix(key, v.key) {
if v.key.Equal(localStoreSuggestedCompactionSuffix) {
return v.name + "/" + suggestedCompactionKeyPrint(
append(roachpb.Key(nil), append(localStorePrefix, key...)...),
)
} else if v.key.Equal(localStoreNodeTombstoneSuffix) {
return v.name + "/" + nodeTombstoneKeyPrint(
append(roachpb.Key(nil), append(localStorePrefix, key...)...),
)
}
return v.name
}
Expand All @@ -218,8 +231,8 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string {
func localStoreKeyParse(input string) (remainder string, output roachpb.Key) {
for _, s := range constSubKeyDict {
if strings.HasPrefix(input, s.name) {
if s.key.Equal(localStoreSuggestedCompactionSuffix) {
panic(&ErrUglifyUnsupported{errors.New("cannot parse suggested compaction key")})
if s.key.Equal(localStoreSuggestedCompactionSuffix) || s.key.Equal(localStoreNodeTombstoneSuffix) {
panic(&ErrUglifyUnsupported{errors.Errorf("cannot parse local store key with suffix %s", s.key)})
}
output = MakeStoreKey(s.key, nil)
return
Expand Down
3 changes: 2 additions & 1 deletion pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.StoreIdentKey(), "/Local/Store/storeIdent", revertSupportUnknown},
{keys.StoreGossipKey(), "/Local/Store/gossipBootstrap", revertSupportUnknown},
{keys.StoreClusterVersionKey(), "/Local/Store/clusterVersion", revertSupportUnknown},
{keys.StoreNodeTombstoneKey(123), "/Local/Store/nodeTombstone/123", revertSupportUnknown},
{keys.StoreSuggestedCompactionKey(keys.MinKey, roachpb.Key("b")), `/Local/Store/suggestedCompaction/{/Min-"b"}`, revertSupportUnknown},
{keys.StoreSuggestedCompactionKey(roachpb.Key("a"), roachpb.Key("b")), `/Local/Store/suggestedCompaction/{"a"-"b"}`, revertSupportUnknown},
{keys.StoreSuggestedCompactionKey(roachpb.Key("a"), keys.MaxKey), `/Local/Store/suggestedCompaction/{"a"-/Max}`, revertSupportUnknown},
Expand Down Expand Up @@ -314,7 +315,7 @@ exp: %s
t.Errorf("%d: ugly print expected unexpectedly unsupported (%s)", i, test.exp)
}
} else if exp, act := test.key, parsed; !bytes.Equal(exp, act) {
t.Errorf("%d: ugly print expected %q, got %q", i, exp, act)
t.Errorf("%d: ugly print expected '%q', got '%q'", i, exp, act)
}
if t.Failed() {
return
Expand Down
43 changes: 33 additions & 10 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ import (
"golang.org/x/sync/syncmap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
encodingproto "google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)

func init() {
Expand Down Expand Up @@ -376,15 +378,15 @@ type ContextOptions struct {
Clock *hlc.Clock
Stopper *stop.Stopper
Settings *cluster.Settings
// OnHandlePing is called when handling a PingRequest, after
// OnIncomingPing is called when handling a PingRequest, after
// preliminary checks but before recording clock offset information.
//
// It can inject an error.
OnHandlePing func(*PingRequest) error
// OnSendPing intercepts outgoing PingRequests. It may inject an
OnIncomingPing func(*PingRequest) error
// OnOutgoingPing intercepts outgoing PingRequests. It may inject an
// error.
OnSendPing func(*PingRequest) error
Knobs ContextTestingKnobs
OnOutgoingPing func(*PingRequest) error
Knobs ContextTestingKnobs
}

func (c ContextOptions) validate() error {
Expand All @@ -404,9 +406,9 @@ func (c ContextOptions) validate() error {
return errors.New("Settings must be set")
}

// NB: OnSendPing and OnHandlePing default to noops.
// NB: OnOutgoingPing and OnIncomingPing default to noops.
// This is used both for testing and the cli.
_, _ = c.OnSendPing, c.OnHandlePing
_, _ = c.OnOutgoingPing, c.OnIncomingPing

return nil
}
Expand Down Expand Up @@ -1131,6 +1133,13 @@ func (ctx *Context) runHeartbeat(
// Give the first iteration a wait-free heartbeat attempt.
heartbeatTimer.Reset(0)
everSucceeded := false
// Both transient and permanent errors can arise here. Transient errors
// set the `heartbeatResult.err` field but retain the connection.
// Permanent errors return an error from this method, which means that
// the connection will be removed. Errors are presumed transient by
// default, but some - like ClusterID or version mismatches, as well as
// PermissionDenied errors injected by OnOutgoingPing, are considered permanent.
returnErr := false
for {
select {
case <-redialChan:
Expand All @@ -1154,7 +1163,7 @@ func (ctx *Context) runHeartbeat(
}

interceptor := func(*PingRequest) error { return nil }
if fn := ctx.OnSendPing; fn != nil {
if fn := ctx.OnOutgoingPing; fn != nil {
interceptor = fn
}

Expand All @@ -1164,6 +1173,7 @@ func (ctx *Context) runHeartbeat(
// NB: We want the request to fail-fast (the default), otherwise we won't
// be notified of transport failures.
if err := interceptor(request); err != nil {
returnErr = true
return err
}
var err error
Expand All @@ -1177,9 +1187,13 @@ func (ctx *Context) runHeartbeat(
err = ping(goCtx)
}

if s, ok := grpcstatus.FromError(errors.UnwrapAll(err)); ok && s.Code() == codes.PermissionDenied {
returnErr = true
}

if err == nil {
// We verify the cluster name on the initiator side (instead
// of the hearbeat service side, as done for the cluster ID
// of the heartbeat service side, as done for the cluster ID
// and node ID checks) so that the operator who is starting a
// new node in a cluster and mistakenly joins the wrong
// cluster gets a chance to see the error message on their
Expand All @@ -1188,13 +1202,19 @@ func (ctx *Context) runHeartbeat(
err = errors.Wrap(
checkClusterName(ctx.Config.ClusterName, response.ClusterName),
"cluster name check failed on ping response")
if err != nil {
returnErr = true
}
}
}

if err == nil {
err = errors.Wrap(
checkVersion(goCtx, ctx.Settings, response.ServerVersion),
"version compatibility check failed on ping response")
if err != nil {
returnErr = true
}
}

if err == nil {
Expand Down Expand Up @@ -1232,6 +1252,9 @@ func (ctx *Context) runHeartbeat(
state = updateHeartbeatState(&ctx.metrics, state, hr.state())
conn.heartbeatResult.Store(hr)
setInitialHeartbeatDone()
if returnErr {
return err
}
return nil
}); err != nil {
return err
Expand All @@ -1251,7 +1274,7 @@ func (ctx *Context) NewHeartbeatService() *HeartbeatService {
clusterID: &ctx.ClusterID,
nodeID: &ctx.NodeID,
settings: ctx.Settings,
onHandlePing: ctx.OnHandlePing,
onHandlePing: ctx.OnIncomingPing,
testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer,
}
}
6 changes: 3 additions & 3 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestHeartbeatCB(t *testing.T) {
})
}

// TestPingInterceptors checks that OnSendPing and OnHandlePing can inject errors.
// TestPingInterceptors checks that OnOutgoingPing and OnIncomingPing can inject errors.
func TestPingInterceptors(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -181,13 +181,13 @@ func TestPingInterceptors(t *testing.T) {
Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond),
Stopper: stop.NewStopper(),
Settings: cluster.MakeTestingClusterSettings(),
OnSendPing: func(req *PingRequest) error {
OnOutgoingPing: func(req *PingRequest) error {
if req.TargetNodeID == blockedTargetNodeID {
return errBoomSend
}
return nil
},
OnHandlePing: func(req *PingRequest) error {
OnIncomingPing: func(req *PingRequest) error {
if req.OriginNodeID == blockedOriginNodeID {
return errBoomRecv
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type HeartbeatService struct {
clusterName string
disableClusterNameVerification bool

onHandlePing func(*PingRequest) error // see ContextOptions.OnHandlePing
onHandlePing func(*PingRequest) error // see ContextOptions.OnIncomingPing

// TestingAllowNamedRPCToAnonymousServer, when defined (in tests),
// disables errors in case a heartbeat requests a specific node ID but
Expand Down
58 changes: 58 additions & 0 deletions pkg/server/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -29,7 +31,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// TestClusterConnectivity sets up an uninitialized cluster with custom join
Expand Down Expand Up @@ -332,3 +337,56 @@ func TestJoinVersionGate(t *testing.T) {
t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error())
}
}

func TestDecommissionedNodeCannotConnect(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

numNodes := 3
tcArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
}

tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(ctx)

for _, status := range []kvserverpb.MembershipStatus{
kvserverpb.MembershipStatus_DECOMMISSIONING, kvserverpb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3}))
}

testutils.SucceedsSoon(t, func() error {
for _, idx := range []int{0, 1} {
clusterSrv := tc.Server(idx)
decomSrv := tc.Server(2)

// Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3.
{
_, err := clusterSrv.RPCContext().GRPCDialNode(
decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err)
}
}

// And similarly, n3 will be refused by n1, n2.
{
_, err := decomSrv.RPCContext().GRPCDialNode(
clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err)
}
}
}
return nil
})
}
Loading