Skip to content

Commit

Permalink
Merge #39398
Browse files Browse the repository at this point in the history
39398: rpc,nodedialer: excise methods which do not use ConnectionClass r=ajwerner a=ajwerner

#39172 added the concept of connection class to create multiple connections
to a given target. In order to reduce the adoption burden that PR introduced new
methods for dialing connections, checking connection health, and retrieving circuit
breakers which took a ConnectionClass and carried a `Class` suffix. It left the previous
method signatures untouched, opting instead to convert them to a shorthand which passed DefaultClass to the new method.

This PR moves all clients of these methods to use ConnectionClass explicitly.

Release note: None

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
  • Loading branch information
craig[bot] and ajwerner committed Aug 21, 2019
2 parents 9e76362 + b3190af commit 727df28
Show file tree
Hide file tree
Showing 32 changed files with 155 additions and 140 deletions.
2 changes: 2 additions & 0 deletions pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand Down Expand Up @@ -250,6 +251,7 @@ func TestRemoveDeadReplicas(t *testing.T) {
grpcConn, err := tc.Server(0).RPCContext().GRPCDialNode(
tc.Server(0).ServingRPCAddr(),
tc.Server(0).NodeID(),
rpc.DefaultClass,
).Connect(ctx)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func grpcTransportFactoryImpl(
) (Transport, error) {
clients := make([]batchClient, 0, len(replicas))
for _, replica := range replicas {
healthy := nodeDialer.ConnHealth(replica.NodeID) == nil
healthy := nodeDialer.ConnHealth(replica.NodeID, opts.class) == nil
clients = append(clients, batchClient{
replica: replica.ReplicaDescriptor,
healthy: healthy,
Expand Down
32 changes: 2 additions & 30 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,14 +818,14 @@ func (ctx *Context) GRPCUnvalidatedDial(target string) *Connection {
return ctx.grpcDialNodeInternal(target, 0, SystemClass)
}

// GRPCDialNodeClass calls grpc.Dial with options appropriate for the
// GRPCDialNode calls grpc.Dial with options appropriate for the
// context and class (see the comment on ConnectionClass).
//
// The remoteNodeID becomes a constraint on the expected node ID of
// the remote node; this is checked during heartbeats. The caller is
// responsible for ensuring the remote node ID is known prior to using
// this function.
func (ctx *Context) GRPCDialNodeClass(
func (ctx *Context) GRPCDialNode(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
if remoteNodeID == 0 && !ctx.TestingAllowNamedRPCToAnonymousServer {
Expand All @@ -834,11 +834,6 @@ func (ctx *Context) GRPCDialNodeClass(
return ctx.grpcDialNodeInternal(target, remoteNodeID, class)
}

// GRPCDialNode is a shorthand for calling GRPCDialNodeClass with DefaultClass.
func (ctx *Context) GRPCDialNode(target string, remoteNodeID roachpb.NodeID) *Connection {
return ctx.GRPCDialNodeClass(target, remoteNodeID, DefaultClass)
}

func (ctx *Context) grpcDialNodeInternal(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
Expand Down Expand Up @@ -912,29 +907,6 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
// the first heartbeat.
var ErrNotHeartbeated = errors.New("not yet heartbeated")

// TestingConnHealth returns nil if we have an open connection to the given
// target that succeeded on its most recent heartbeat. Otherwise, it
// kicks off a connection attempt (unless one is already in progress
// or we are in a backoff state) and returns an error (typically
// ErrNotHeartbeated). This is a conservative/pessimistic indicator:
// if we have not attempted to talk to the target node recently, an
// error will be returned. This method should therefore be used to
// prioritize among a list of candidate nodes, but not to filter out
// "unhealthy" nodes.
//
// This is used in tests only; in clusters use (*Dialer).ConnHealth()
// instead which automates the address resolution.
//
// TODO(knz): remove this altogether. Use the dialer in all cases.
func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) error {
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
// The local server is always considered healthy.
return nil
}
conn := ctx.GRPCDialNode(target, nodeID)
return conn.Health()
}

func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) (retErr error) {
Expand Down
61 changes: 42 additions & 19 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ import (
"google.golang.org/grpc/status"
)

// TestingConnHealth returns nil if we have an open connection to the given
// target with DefaultClass that succeeded on its most recent heartbeat.
// Otherwise, it kicks off a connection attempt (unless one is already in
// progress or we are in a backoff state) and returns an error (typically
// ErrNotHeartbeated). This is a conservative/pessimistic indicator:
// if we have not attempted to talk to the target node recently, an
// error will be returned. This method should therefore be used to
// prioritize among a list of candidate nodes, but not to filter out
// "unhealthy" nodes.
//
// This is used in tests only; in clusters use (*Dialer).ConnHealth()
// instead which automates the address resolution.
//
// TODO(knz): remove this altogether. Use the dialer in all cases.
func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) error {
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
// The local server is always considered healthy.
return nil
}
conn := ctx.GRPCDialNode(target, nodeID, DefaultClass)
return conn.Health()
}

// AddTestingDialOpts adds extra dialing options to the rpc Context. This should
// be done before GRPCDial is called.
func (ctx *Context) AddTestingDialOpts(opts ...grpc.DialOption) {
Expand Down Expand Up @@ -128,7 +151,7 @@ func TestHeartbeatCB(t *testing.T) {
})
}

if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -257,7 +280,7 @@ func TestHeartbeatHealth(t *testing.T) {
}
remoteAddr := ln.Addr().String()
if _, err := clientCtx.GRPCDialNode(
remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -402,7 +425,7 @@ func TestConnectionRemoveNodeIDZero(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
clientCtx := newTestContext(uuid.MakeV4(), clock, stopper)
// Provoke an error.
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1).Connect(context.Background())
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1, DefaultClass).Connect(context.Background())
if err == nil {
t.Fatal("expected some kind of error, got nil")
}
Expand Down Expand Up @@ -509,7 +532,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {
clientCtx := newTestContext(clusterID, clock, stopper)
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}
// Everything is normal; should become healthy.
Expand Down Expand Up @@ -585,7 +608,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {

// We can reconnect and the connection becomes healthy again.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
return err
}
return clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
Expand Down Expand Up @@ -664,7 +687,7 @@ func TestOffsetMeasurement(t *testing.T) {
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond
clientCtx.RemoteClocks.offsetTTL = 5 * clientAdvancing.getAdvancementInterval()
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -736,7 +759,7 @@ func TestFailedOffsetMeasurement(t *testing.T) {
// clock reading delay, not the timeout.
clientCtx.heartbeatTimeout = 0
go func() { heartbeat.ready <- nil }() // Allow one heartbeat for initialization.
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -846,7 +869,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
if _, err := clientNodeContext.ctx.GRPCDialNode(
serverNodeContext.ctx.Addr,
serverNodeContext.ctx.NodeID.Get(),
).Connect(context.Background()); err != nil {
DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1068,7 +1091,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
grpc.WithKeepaliveParams(cKeepalive),
)
log.Infof(ctx, "dialing server")
conn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(ctx)
conn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1236,7 +1259,7 @@ func TestClusterIDMismatch(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
_, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background())
_, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())
expected := "initial connection heartbeat failed.*doesn't match server cluster ID"
if !testutils.IsError(err, expected) {
t.Errorf("expected %s error, got %v", expected, err)
Expand Down Expand Up @@ -1357,7 +1380,7 @@ func TestNodeIDMismatch(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
_, err := clientCtx.GRPCDialNode(remoteAddr, 2).Connect(context.Background())
_, err := clientCtx.GRPCDialNode(remoteAddr, 2, DefaultClass).Connect(context.Background())
expected := "initial connection heartbeat failed.*doesn't match server node ID"
if !testutils.IsError(err, expected) {
t.Errorf("expected %s error, got %v", expected, err)
Expand Down Expand Up @@ -1433,7 +1456,7 @@ func TestVersionCheckBidirectional(t *testing.T) {
t.Fatal(err)
}

_, err = clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background())
_, err = clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())

if td.expectError {
expected := "initial connection heartbeat failed.*cluster requires at least version"
Expand Down Expand Up @@ -1473,8 +1496,8 @@ func TestGRPCDialClass(t *testing.T) {
remoteAddr := ln.Addr().String()
clientCtx := newTestContext(serverCtx.ClusterID.Get(), clock, stopper)

def1 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, DefaultClass)
sys1 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, SystemClass)
def1 := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass)
sys1 := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass)
require.False(t, sys1 == def1,
"expected connections dialed with different classes to the same target to differ")
defConn1, err := def1.Connect(context.TODO())
Expand All @@ -1483,10 +1506,10 @@ func TestGRPCDialClass(t *testing.T) {
require.Nil(t, err, "expected successful connection")
require.False(t, sysConn1 == defConn1, "expected connections dialed with "+
"different classes to the sametarget to have separate underlying gRPC connections")
def2 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, DefaultClass)
def2 := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass)
require.True(t, def1 == def2, "expected connections dialed with the same "+
"class to the same target to be the same")
sys2 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, SystemClass)
sys2 := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass)
require.True(t, sys1 == sys2, "expected connections dialed with the same "+
"class to the same target to be the same")
for _, c := range []*Connection{def2, sys2} {
Expand Down Expand Up @@ -1590,9 +1613,9 @@ func TestTestingKnobs(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
require.Nil(t, err)
remoteAddr := ln.Addr().String()
sysConn, err := clientCtx.GRPCDialNodeClass(remoteAddr, 1, SystemClass).Connect(context.TODO())
sysConn, err := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass).Connect(context.TODO())
require.Nil(t, err)
defConn, err := clientCtx.GRPCDialNodeClass(remoteAddr, 1, DefaultClass).Connect(context.TODO())
defConn, err := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass).Connect(context.TODO())
require.Nil(t, err)
const unaryMethod = "/cockroach.rpc.Testing/Foo"
const streamMethod = "/cockroach.rpc.Testing/Bar"
Expand Down Expand Up @@ -1657,7 +1680,7 @@ func BenchmarkGRPCDial(b *testing.B) {

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := ctx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background())
_, err := ctx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())
if err != nil {
b.Fatal(err)
}
Expand Down
36 changes: 10 additions & 26 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (n *Dialer) Stopper() *stop.Stopper {
// Silence lint warning because this method is only used in race builds.
var _ = (*Dialer).Stopper

// DialClass returns a grpc connection to the given node. It logs whenever the
// Dial returns a grpc connection to the given node. It logs whenever the
// node first becomes unreachable or reachable.
func (n *Dialer) DialClass(
func (n *Dialer) Dial(
ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
if n == nil || n.resolver == nil {
Expand All @@ -90,11 +90,6 @@ func (n *Dialer) DialClass(
return n.dial(ctx, nodeID, addr, breaker, class)
}

// Dial is shorthand for n.DialClass(ctx, nodeID, rpc.DefaultClass).
func (n *Dialer) Dial(ctx context.Context, nodeID roachpb.NodeID) (_ *grpc.ClientConn, err error) {
return n.DialClass(ctx, nodeID, rpc.DefaultClass)
}

// DialInternalClient is a specialization of DialClass for callers that
// want a roachpb.InternalClient. This supports an optimization to bypass the
// network for the local node. Returns a context.Context which should be used
Expand Down Expand Up @@ -149,7 +144,7 @@ func (n *Dialer) dial(
log.Infof(ctx, "unable to connect to n%d: %s", nodeID, err)
}
}()
conn, err := n.rpcContext.GRPCDialNodeClass(addr.String(), nodeID, class).Connect(ctx)
conn, err := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class).Connect(ctx)
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
Expand Down Expand Up @@ -178,10 +173,10 @@ func (n *Dialer) dial(
return conn, nil
}

// ConnHealthClass returns nil if we have an open connection of the request
// ConnHealth returns nil if we have an open connection of the request
// class to the given node that succeeded on its most recent heartbeat. See the
// method of the same name on rpc.Context for more details.
func (n *Dialer) ConnHealthClass(nodeID roachpb.NodeID, class rpc.ConnectionClass) error {
func (n *Dialer) ConnHealth(nodeID roachpb.NodeID, class rpc.ConnectionClass) error {
if n == nil || n.resolver == nil {
return errors.New("no node dialer configured")
}
Expand All @@ -198,25 +193,14 @@ func (n *Dialer) ConnHealthClass(nodeID roachpb.NodeID, class rpc.ConnectionClas
// The local client is always considered healthy.
return nil
}
conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID)
conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class)
return conn.Health()
}

// ConnHealth is shorthand for n.ConnHealthClass(nodeID, rpc.DefaultClass).
func (n *Dialer) ConnHealth(nodeID roachpb.NodeID) error {
return n.ConnHealthClass(nodeID, rpc.DefaultClass)
}

// GetCircuitBreaker is shorthand for
// n.GetCircuitBreakerClass(nodeID, rpc.DefaultClass).
func (n *Dialer) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker {
return n.GetCircuitBreakerClass(nodeID, rpc.DefaultClass)
}

// GetCircuitBreakerClass retrieves the circuit breaker for connections to the
// GetCircuitBreaker retrieves the circuit breaker for connections to the
// given node. The breaker should not be mutated as this affects all connections
// dialing to that node through this NodeDialer.
func (n *Dialer) GetCircuitBreakerClass(
func (n *Dialer) GetCircuitBreaker(
nodeID roachpb.NodeID, class rpc.ConnectionClass,
) *circuit.Breaker {
return n.getBreaker(nodeID, class).Breaker
Expand All @@ -236,11 +220,11 @@ func (n *Dialer) getBreaker(nodeID roachpb.NodeID, class rpc.ConnectionClass) *w
type dialerAdapter Dialer

func (da *dialerAdapter) Ready(nodeID roachpb.NodeID) bool {
return (*Dialer)(da).GetCircuitBreaker(nodeID).Ready()
return (*Dialer)(da).GetCircuitBreaker(nodeID, rpc.DefaultClass).Ready()
}

func (da *dialerAdapter) Dial(ctx context.Context, nodeID roachpb.NodeID) (ctpb.Client, error) {
c, err := (*Dialer)(da).Dial(ctx, nodeID)
c, err := (*Dialer)(da).Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 727df28

Please sign in to comment.