Skip to content

Commit eedee18

Browse files
craig[bot]knz
andcommitted
Merge #34197 #36952
34197: server,rpc: validate node IDs in RPC heartbeats r=tbg a=knz Fixes #34158. Prior to this patch, it was possible for a RPC client to dial a node ID and get a connection to another node instead. This is because the mapping of node ID -> address may be stale, and a different node could take the address of the intended node from "under" the dialer. (See #34155 for a scenario.) This happened to be "safe" in many cases where it matters because: - RPC requests for distSQL are OK with being served on a different node than intended (with potential performance drop); - RPC requests to the KV layer are OK with being served on a different node than intended (they would route underneath); - RPC requests to the storage layer are rejected by the remote node because the store ID in the request would not match. However this safety is largely accidental, and we should not work with the assumption that any RPC request is safe to be mis-routed. (In fact, we have not audited all the RPC endpoints and cannot establish this safety exists throughout.) This patch works to prevent these mis-routings by adding a check of the intended node ID during RPC heartbeats (including the initial heartbeat), when the intended node ID is known. A new API `GRPCDialNode()` is introduced to establish such connections. Release note (bug fix): CockroachDB now performs fewer attempts to communicate with the wrong node, when a node is restarted with another node's address. 36952: storage: deflake TestNodeLivenessStatusMap r=tbg a=knz Fixes #35675. Prior to this patch, this test would fail `stressrace` after a few dozen iterations. With this patch, `stressrace` succeeds thousands of iterations. I have checked that the test logic is preserved: if I change one of the expected statuses in `testData`, the test still fail properly. Release note: None Co-authored-by: Raphael 'kena' Poss <knz@cockroachlabs.com>
3 parents 9335ce0 + 6641499 + af6a6e8 commit eedee18

32 files changed

+471
-169
lines changed

pkg/cli/debug_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ func TestRemoveDeadReplicas(t *testing.T) {
252252
tc := testcluster.StartTestCluster(t, 3, clusterArgs)
253253
defer tc.Stopper().Stop(ctx)
254254

255-
grpcConn, err := tc.Server(0).RPCContext().GRPCDial(tc.Server(0).ServingAddr()).Connect(ctx)
255+
grpcConn, err := tc.Server(0).RPCContext().GRPCDialNode(
256+
tc.Server(0).ServingAddr(),
257+
tc.Server(0).NodeID(),
258+
).Connect(ctx)
256259
if err != nil {
257260
t.Fatal(err)
258261
}

pkg/cli/start.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1132,7 +1132,9 @@ func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, func(
11321132
stopper.Stop(ctx)
11331133
return nil, nil, nil, err
11341134
}
1135-
conn, err := rpcContext.GRPCDial(addr).Connect(ctx)
1135+
// We use GRPCUnvalidatedDial() here because it does not matter
1136+
// to which node we're talking to.
1137+
conn, err := rpcContext.GRPCUnvalidatedDial(addr).Connect(ctx)
11361138
if err != nil {
11371139
stopper.Stop(ctx)
11381140
return nil, nil, nil, err

pkg/gossip/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (c *client) startLocked(
109109
// asynchronous from the caller's perspective, so the only effect of
110110
// `WithBlock` here is blocking shutdown - at the time of this writing,
111111
// that ends ups up making `kv` tests take twice as long.
112-
conn, err := rpcCtx.GRPCDial(c.addr.String()).Connect(ctx)
112+
conn, err := rpcCtx.GRPCUnvalidatedDial(c.addr.String()).Connect(ctx)
113113
if err != nil {
114114
return err
115115
}

pkg/gossip/client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func startGossipAtAddr(
6666
registry *metric.Registry,
6767
) *Gossip {
6868
rpcContext := newInsecureRPCContext(stopper)
69+
rpcContext.NodeID.Set(context.TODO(), nodeID)
6970
server := rpc.NewServer(rpcContext)
7071
g := NewTest(nodeID, rpcContext, server, stopper, registry)
7172
ln, err := netutil.ListenAndServeGRPC(stopper, server, addr)

pkg/gossip/gossip_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ func TestGossipInfoStore(t *testing.T) {
6161
}
6262

6363
// TestGossipMoveNode verifies that if a node is moved to a new address, it
64-
// gets properly updated in gossip (including that any other node that was
65-
// previously at that address gets removed from the cluster).
64+
// gets properly updated in gossip.
6665
func TestGossipMoveNode(t *testing.T) {
6766
defer leaktest.AfterTest(t)()
6867
stopper := stop.NewStopper()
@@ -462,7 +461,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
462461
c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, local.GetNodeAddr(), makeMetrics())
463462

464463
testutils.SucceedsSoon(t, func() error {
465-
conn, err := peer.rpcContext.GRPCDial(c.addr.String()).Connect(ctx)
464+
conn, err := peer.rpcContext.GRPCUnvalidatedDial(c.addr.String()).Connect(ctx)
466465
if err != nil {
467466
return err
468467
}

pkg/kv/send_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ func TestSendToOneClient(t *testing.T) {
6666
stopper,
6767
&cluster.MakeTestingClusterSettings().Version,
6868
)
69+
70+
// This test uses the testing function sendBatch() which does not
71+
// support setting the node ID on GRPCDialNode(). Disable Node ID
72+
// checks to avoid log.Fatal.
73+
rpcContext.TestingAllowNamedRPCToAnonymousServer = true
74+
6975
s := rpc.NewServer(rpcContext)
7076
roachpb.RegisterInternalServer(s, Node(0))
7177
ln, err := netutil.ListenAndServeGRPC(rpcContext.Stopper, s, util.TestAddr)
@@ -136,6 +142,10 @@ func TestComplexScenarios(t *testing.T) {
136142
stopper,
137143
&cluster.MakeTestingClusterSettings().Version,
138144
)
145+
146+
// We're going to serve multiple node IDs with that one
147+
// context. Disable node ID checks.
148+
nodeContext.TestingAllowNamedRPCToAnonymousServer = true
139149
nodeDialer := nodedialer.New(nodeContext, nil)
140150

141151
// TODO(bdarnell): the retryable flag is no longer used for RPC errors.

pkg/rpc/context.go

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,12 @@ func NewServerWithInterceptor(
276276

277277
s := grpc.NewServer(opts...)
278278
RegisterHeartbeatServer(s, &HeartbeatService{
279-
clock: ctx.LocalClock,
280-
remoteClockMonitor: ctx.RemoteClocks,
281-
clusterID: &ctx.ClusterID,
282-
version: ctx.version,
279+
clock: ctx.LocalClock,
280+
remoteClockMonitor: ctx.RemoteClocks,
281+
clusterID: &ctx.ClusterID,
282+
nodeID: &ctx.NodeID,
283+
version: ctx.version,
284+
testingAllowNamedRPCToAnonymousServer: ctx.TestingAllowNamedRPCToAnonymousServer,
283285
})
284286
return s
285287
}
@@ -298,14 +300,20 @@ type Connection struct {
298300
initialHeartbeatDone chan struct{} // closed after first heartbeat
299301
stopper *stop.Stopper
300302

303+
// remoteNodeID implies checking the remote node ID. 0 when unknown,
304+
// non-zero to check with remote node. This is constant throughout
305+
// the lifetime of a Connection object.
306+
remoteNodeID roachpb.NodeID
307+
301308
initOnce sync.Once
302309
validatedOnce sync.Once
303310
}
304311

305-
func newConnection(stopper *stop.Stopper) *Connection {
312+
func newConnectionToNodeID(stopper *stop.Stopper, remoteNodeID roachpb.NodeID) *Connection {
306313
c := &Connection{
307314
initialHeartbeatDone: make(chan struct{}),
308315
stopper: stopper,
316+
remoteNodeID: remoteNodeID,
309317
}
310318
c.heartbeatResult.Store(heartbeatResult{err: ErrNotHeartbeated})
311319
return c
@@ -372,11 +380,23 @@ type Context struct {
372380
stats StatsHandler
373381

374382
ClusterID base.ClusterIDContainer
383+
NodeID base.NodeIDContainer
375384
version *cluster.ExposedClusterVersion
376385

377386
// For unittesting.
378387
BreakerFactory func() *circuit.Breaker
379388
testingDialOpts []grpc.DialOption
389+
390+
// For testing. See the comment on the same field in HeartbeatService.
391+
TestingAllowNamedRPCToAnonymousServer bool
392+
}
393+
394+
// connKey is used as key in the Context.conns map. Different remote
395+
// node IDs get different *Connection objects, to ensure that we don't
396+
// mis-route RPC requests.
397+
type connKey struct {
398+
targetAddr string
399+
nodeID roachpb.NodeID
380400
}
381401

382402
// NewContext creates an rpc Context with the supplied values.
@@ -422,7 +442,7 @@ func NewContext(
422442
conn.dialErr = &roachpb.NodeUnavailableError{}
423443
}
424444
})
425-
ctx.removeConn(k.(string), conn)
445+
ctx.removeConn(k.(connKey), conn)
426446
return true
427447
})
428448
})
@@ -439,8 +459,10 @@ func (ctx *Context) GetStatsMap() *syncmap.Map {
439459

440460
// GetLocalInternalClientForAddr returns the context's internal batch client
441461
// for target, if it exists.
442-
func (ctx *Context) GetLocalInternalClientForAddr(target string) roachpb.InternalClient {
443-
if target == ctx.AdvertiseAddr {
462+
func (ctx *Context) GetLocalInternalClientForAddr(
463+
target string, nodeID roachpb.NodeID,
464+
) roachpb.InternalClient {
465+
if target == ctx.AdvertiseAddr && nodeID == ctx.NodeID.Get() {
444466
return ctx.localInternalClient
445467
}
446468
return nil
@@ -544,15 +566,15 @@ func (ctx *Context) SetLocalInternalServer(internalServer roachpb.InternalServer
544566
ctx.localInternalClient = internalClientAdapter{internalServer}
545567
}
546568

547-
func (ctx *Context) removeConn(key string, conn *Connection) {
569+
func (ctx *Context) removeConn(key connKey, conn *Connection) {
548570
ctx.conns.Delete(key)
549571
if log.V(1) {
550-
log.Infof(ctx.masterCtx, "closing %s", key)
572+
log.Infof(ctx.masterCtx, "closing %+v", key)
551573
}
552574
if grpcConn := conn.grpcConn; grpcConn != nil {
553575
if err := grpcConn.Close(); err != nil && !grpcutil.IsClosedConnection(err) {
554576
if log.V(1) {
555-
log.Errorf(ctx.masterCtx, "failed to close client connection: %s", err)
577+
log.Errorf(ctx.masterCtx, "failed to close client connection: %v", err)
556578
}
557579
}
558580
}
@@ -675,11 +697,43 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{
675697
return conn, dialer.redialChan, err
676698
}
677699

678-
// GRPCDial calls grpc.Dial with options appropriate for the context.
679-
func (ctx *Context) GRPCDial(target string) *Connection {
680-
value, ok := ctx.conns.Load(target)
700+
// GRPCUnvalidatedDial uses GRPCDialNode and disables validation of the
701+
// node ID between client and server. This function should only be
702+
// used with the gossip client and CLI commands which can talk to any
703+
// node.
704+
func (ctx *Context) GRPCUnvalidatedDial(target string) *Connection {
705+
return ctx.grpcDialNodeInternal(target, 0)
706+
}
707+
708+
// GRPCDialNode calls grpc.Dial with options appropriate for the context.
709+
//
710+
// The remoteNodeID becomes a constraint on the expected node ID of
711+
// the remote node; this is checked during heartbeats. The caller is
712+
// responsible for ensuring the remote node ID is known prior to using
713+
// this function.
714+
func (ctx *Context) GRPCDialNode(target string, remoteNodeID roachpb.NodeID) *Connection {
715+
if remoteNodeID == 0 && !ctx.TestingAllowNamedRPCToAnonymousServer {
716+
log.Fatalf(context.TODO(), "invalid node ID 0 in GRPCDialNode()")
717+
}
718+
return ctx.grpcDialNodeInternal(target, remoteNodeID)
719+
}
720+
721+
func (ctx *Context) grpcDialNodeInternal(target string, remoteNodeID roachpb.NodeID) *Connection {
722+
thisConnKey := connKey{target, remoteNodeID}
723+
value, ok := ctx.conns.Load(thisConnKey)
681724
if !ok {
682-
value, _ = ctx.conns.LoadOrStore(target, newConnection(ctx.Stopper))
725+
value, _ = ctx.conns.LoadOrStore(thisConnKey, newConnectionToNodeID(ctx.Stopper, remoteNodeID))
726+
if remoteNodeID != 0 {
727+
// If the first connection established at a target address is
728+
// for a specific node ID, then we want to reuse that connection
729+
// also for other dials (eg for gossip) which don't require a
730+
// specific node ID. (We do this as an optimization to reduce
731+
// the number of TCP connections alive between nodes. This is
732+
// not strictly required for correctness.) This LoadOrStore will
733+
// ensure we're registering the connection we just created for
734+
// future use by these other dials.
735+
_, _ = ctx.conns.LoadOrStore(connKey{target, 0}, value)
736+
}
683737
}
684738

685739
conn := value.(*Connection)
@@ -694,11 +748,11 @@ func (ctx *Context) GRPCDial(target string) *Connection {
694748
if err != nil && !grpcutil.IsClosedConnection(err) {
695749
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
696750
}
697-
ctx.removeConn(target, conn)
751+
ctx.removeConn(thisConnKey, conn)
698752
})
699753
}); err != nil {
700754
conn.dialErr = err
701-
ctx.removeConn(target, conn)
755+
ctx.removeConn(thisConnKey, conn)
702756
}
703757
}
704758
})
@@ -720,7 +774,7 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
720774
// the first heartbeat.
721775
var ErrNotHeartbeated = errors.New("not yet heartbeated")
722776

723-
// ConnHealth returns nil if we have an open connection to the given
777+
// TestingConnHealth returns nil if we have an open connection to the given
724778
// target that succeeded on its most recent heartbeat. Otherwise, it
725779
// kicks off a connection attempt (unless one is already in progress
726780
// or we are in a backoff state) and returns an error (typically
@@ -729,27 +783,26 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated")
729783
// error will be returned. This method should therefore be used to
730784
// prioritize among a list of candidate nodes, but not to filter out
731785
// "unhealthy" nodes.
732-
func (ctx *Context) ConnHealth(target string) error {
733-
if ctx.GetLocalInternalClientForAddr(target) != nil {
786+
//
787+
// This is used in tests only; in clusters use (*Dialer).ConnHealth()
788+
// instead which automates the address resolution.
789+
//
790+
// TODO(knz): remove this altogether. Use the dialer in all cases.
791+
func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) error {
792+
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
734793
// The local server is always considered healthy.
735794
return nil
736795
}
737-
conn := ctx.GRPCDial(target)
796+
conn := ctx.GRPCDialNode(target, nodeID)
738797
return conn.Health()
739798
}
740799

741800
func (ctx *Context) runHeartbeat(
742801
conn *Connection, target string, redialChan <-chan struct{},
743802
) error {
744803
maxOffset := ctx.LocalClock.MaxOffset()
745-
clusterID := ctx.ClusterID.Get()
804+
maxOffsetNanos := maxOffset.Nanoseconds()
746805

747-
request := PingRequest{
748-
Addr: ctx.Addr,
749-
MaxOffsetNanos: maxOffset.Nanoseconds(),
750-
ClusterID: &clusterID,
751-
ServerVersion: ctx.version.ServerVersion,
752-
}
753806
heartbeatClient := NewHeartbeatClient(conn.grpcConn)
754807

755808
var heartbeatTimer timeutil.Timer
@@ -768,14 +821,24 @@ func (ctx *Context) runHeartbeat(
768821
heartbeatTimer.Read = true
769822
}
770823

824+
// We re-mint the PingRequest to pick up any asynchronous update to clusterID.
825+
clusterID := ctx.ClusterID.Get()
826+
request := &PingRequest{
827+
Addr: ctx.Addr,
828+
MaxOffsetNanos: maxOffsetNanos,
829+
ClusterID: &clusterID,
830+
NodeID: conn.remoteNodeID,
831+
ServerVersion: ctx.version.ServerVersion,
832+
}
833+
771834
var response *PingResponse
772835
sendTime := ctx.LocalClock.PhysicalTime()
773836
err := contextutil.RunWithTimeout(ctx.masterCtx, "rpc heartbeat", ctx.heartbeatTimeout,
774837
func(goCtx context.Context) error {
775838
// NB: We want the request to fail-fast (the default), otherwise we won't
776839
// be notified of transport failures.
777840
var err error
778-
response, err = heartbeatClient.Ping(goCtx, &request)
841+
response, err = heartbeatClient.Ping(goCtx, request)
779842
return err
780843
})
781844

0 commit comments

Comments
 (0)