Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc,nodedialer: excise methods which do not use ConnectionClass #39398

Merged
merged 1 commit into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand Down Expand Up @@ -250,6 +251,7 @@ func TestRemoveDeadReplicas(t *testing.T) {
grpcConn, err := tc.Server(0).RPCContext().GRPCDialNode(
tc.Server(0).ServingRPCAddr(),
tc.Server(0).NodeID(),
rpc.DefaultClass,
).Connect(ctx)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func grpcTransportFactoryImpl(
) (Transport, error) {
clients := make([]batchClient, 0, len(replicas))
for _, replica := range replicas {
healthy := nodeDialer.ConnHealth(replica.NodeID) == nil
healthy := nodeDialer.ConnHealth(replica.NodeID, opts.class) == nil
clients = append(clients, batchClient{
replica: replica.ReplicaDescriptor,
healthy: healthy,
Expand Down
32 changes: 2 additions & 30 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,14 +818,14 @@ func (ctx *Context) GRPCUnvalidatedDial(target string) *Connection {
return ctx.grpcDialNodeInternal(target, 0, SystemClass)
}

// GRPCDialNodeClass calls grpc.Dial with options appropriate for the
// GRPCDialNode calls grpc.Dial with options appropriate for the
// context and class (see the comment on ConnectionClass).
//
// The remoteNodeID becomes a constraint on the expected node ID of
// the remote node; this is checked during heartbeats. The caller is
// responsible for ensuring the remote node ID is known prior to using
// this function.
func (ctx *Context) GRPCDialNodeClass(
func (ctx *Context) GRPCDialNode(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
if remoteNodeID == 0 && !ctx.TestingAllowNamedRPCToAnonymousServer {
Expand All @@ -834,11 +834,6 @@ func (ctx *Context) GRPCDialNodeClass(
return ctx.grpcDialNodeInternal(target, remoteNodeID, class)
}

// GRPCDialNode is a shorthand for calling GRPCDialNodeClass with DefaultClass.
func (ctx *Context) GRPCDialNode(target string, remoteNodeID roachpb.NodeID) *Connection {
return ctx.GRPCDialNodeClass(target, remoteNodeID, DefaultClass)
}

func (ctx *Context) grpcDialNodeInternal(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
Expand Down Expand Up @@ -912,29 +907,6 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
// the first heartbeat.
var ErrNotHeartbeated = errors.New("not yet heartbeated")

// TestingConnHealth returns nil if we have an open connection to the given
// target that succeeded on its most recent heartbeat. Otherwise, it
// kicks off a connection attempt (unless one is already in progress
// or we are in a backoff state) and returns an error (typically
// ErrNotHeartbeated). This is a conservative/pessimistic indicator:
// if we have not attempted to talk to the target node recently, an
// error will be returned. This method should therefore be used to
// prioritize among a list of candidate nodes, but not to filter out
// "unhealthy" nodes.
//
// This is used in tests only; in clusters use (*Dialer).ConnHealth()
// instead which automates the address resolution.
//
// TODO(knz): remove this altogether. Use the dialer in all cases.
func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) error {
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
// The local server is always considered healthy.
return nil
}
conn := ctx.GRPCDialNode(target, nodeID)
return conn.Health()
}

func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) (retErr error) {
Expand Down
61 changes: 42 additions & 19 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ import (
"google.golang.org/grpc/status"
)

// TestingConnHealth returns nil if we have an open connection to the given
// target with DefaultClass that succeeded on its most recent heartbeat.
// Otherwise, it kicks off a connection attempt (unless one is already in
// progress or we are in a backoff state) and returns an error (typically
// ErrNotHeartbeated). This is a conservative/pessimistic indicator:
// if we have not attempted to talk to the target node recently, an
// error will be returned. This method should therefore be used to
// prioritize among a list of candidate nodes, but not to filter out
// "unhealthy" nodes.
//
// This is used in tests only; in clusters use (*Dialer).ConnHealth()
// instead which automates the address resolution.
//
// TODO(knz): remove this altogether. Use the dialer in all cases.
func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) error {
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
// The local server is always considered healthy.
return nil
}
conn := ctx.GRPCDialNode(target, nodeID, DefaultClass)
return conn.Health()
}

// AddTestingDialOpts adds extra dialing options to the rpc Context. This should
// be done before GRPCDial is called.
func (ctx *Context) AddTestingDialOpts(opts ...grpc.DialOption) {
Expand Down Expand Up @@ -128,7 +151,7 @@ func TestHeartbeatCB(t *testing.T) {
})
}

if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -257,7 +280,7 @@ func TestHeartbeatHealth(t *testing.T) {
}
remoteAddr := ln.Addr().String()
if _, err := clientCtx.GRPCDialNode(
remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -402,7 +425,7 @@ func TestConnectionRemoveNodeIDZero(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
clientCtx := newTestContext(uuid.MakeV4(), clock, stopper)
// Provoke an error.
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1).Connect(context.Background())
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1, DefaultClass).Connect(context.Background())
if err == nil {
t.Fatal("expected some kind of error, got nil")
}
Expand Down Expand Up @@ -509,7 +532,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {
clientCtx := newTestContext(clusterID, clock, stopper)
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}
// Everything is normal; should become healthy.
Expand Down Expand Up @@ -585,7 +608,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {

// We can reconnect and the connection becomes healthy again.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
return err
}
return clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
Expand Down Expand Up @@ -664,7 +687,7 @@ func TestOffsetMeasurement(t *testing.T) {
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond
clientCtx.RemoteClocks.offsetTTL = 5 * clientAdvancing.getAdvancementInterval()
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -736,7 +759,7 @@ func TestFailedOffsetMeasurement(t *testing.T) {
// clock reading delay, not the timeout.
clientCtx.heartbeatTimeout = 0
go func() { heartbeat.ready <- nil }() // Allow one heartbeat for initialization.
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background()); err != nil {
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -846,7 +869,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
if _, err := clientNodeContext.ctx.GRPCDialNode(
serverNodeContext.ctx.Addr,
serverNodeContext.ctx.NodeID.Get(),
).Connect(context.Background()); err != nil {
DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1068,7 +1091,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
grpc.WithKeepaliveParams(cKeepalive),
)
log.Infof(ctx, "dialing server")
conn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(ctx)
conn, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1236,7 +1259,7 @@ func TestClusterIDMismatch(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
_, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background())
_, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())
expected := "initial connection heartbeat failed.*doesn't match server cluster ID"
if !testutils.IsError(err, expected) {
t.Errorf("expected %s error, got %v", expected, err)
Expand Down Expand Up @@ -1357,7 +1380,7 @@ func TestNodeIDMismatch(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
_, err := clientCtx.GRPCDialNode(remoteAddr, 2).Connect(context.Background())
_, err := clientCtx.GRPCDialNode(remoteAddr, 2, DefaultClass).Connect(context.Background())
expected := "initial connection heartbeat failed.*doesn't match server node ID"
if !testutils.IsError(err, expected) {
t.Errorf("expected %s error, got %v", expected, err)
Expand Down Expand Up @@ -1433,7 +1456,7 @@ func TestVersionCheckBidirectional(t *testing.T) {
t.Fatal(err)
}

_, err = clientCtx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background())
_, err = clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())

if td.expectError {
expected := "initial connection heartbeat failed.*cluster requires at least version"
Expand Down Expand Up @@ -1473,8 +1496,8 @@ func TestGRPCDialClass(t *testing.T) {
remoteAddr := ln.Addr().String()
clientCtx := newTestContext(serverCtx.ClusterID.Get(), clock, stopper)

def1 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, DefaultClass)
sys1 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, SystemClass)
def1 := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass)
sys1 := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass)
require.False(t, sys1 == def1,
"expected connections dialed with different classes to the same target to differ")
defConn1, err := def1.Connect(context.TODO())
Expand All @@ -1483,10 +1506,10 @@ func TestGRPCDialClass(t *testing.T) {
require.Nil(t, err, "expected successful connection")
require.False(t, sysConn1 == defConn1, "expected connections dialed with "+
"different classes to the sametarget to have separate underlying gRPC connections")
def2 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, DefaultClass)
def2 := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass)
require.True(t, def1 == def2, "expected connections dialed with the same "+
"class to the same target to be the same")
sys2 := clientCtx.GRPCDialNodeClass(remoteAddr, 1, SystemClass)
sys2 := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass)
require.True(t, sys1 == sys2, "expected connections dialed with the same "+
"class to the same target to be the same")
for _, c := range []*Connection{def2, sys2} {
Expand Down Expand Up @@ -1590,9 +1613,9 @@ func TestTestingKnobs(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
require.Nil(t, err)
remoteAddr := ln.Addr().String()
sysConn, err := clientCtx.GRPCDialNodeClass(remoteAddr, 1, SystemClass).Connect(context.TODO())
sysConn, err := clientCtx.GRPCDialNode(remoteAddr, 1, SystemClass).Connect(context.TODO())
require.Nil(t, err)
defConn, err := clientCtx.GRPCDialNodeClass(remoteAddr, 1, DefaultClass).Connect(context.TODO())
defConn, err := clientCtx.GRPCDialNode(remoteAddr, 1, DefaultClass).Connect(context.TODO())
require.Nil(t, err)
const unaryMethod = "/cockroach.rpc.Testing/Foo"
const streamMethod = "/cockroach.rpc.Testing/Bar"
Expand Down Expand Up @@ -1657,7 +1680,7 @@ func BenchmarkGRPCDial(b *testing.B) {

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := ctx.GRPCDialNode(remoteAddr, serverNodeID).Connect(context.Background())
_, err := ctx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background())
if err != nil {
b.Fatal(err)
}
Expand Down
36 changes: 10 additions & 26 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (n *Dialer) Stopper() *stop.Stopper {
// Silence lint warning because this method is only used in race builds.
var _ = (*Dialer).Stopper

// DialClass returns a grpc connection to the given node. It logs whenever the
// Dial returns a grpc connection to the given node. It logs whenever the
// node first becomes unreachable or reachable.
func (n *Dialer) DialClass(
func (n *Dialer) Dial(
ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
if n == nil || n.resolver == nil {
Expand All @@ -90,11 +90,6 @@ func (n *Dialer) DialClass(
return n.dial(ctx, nodeID, addr, breaker, class)
}

// Dial is shorthand for n.DialClass(ctx, nodeID, rpc.DefaultClass).
func (n *Dialer) Dial(ctx context.Context, nodeID roachpb.NodeID) (_ *grpc.ClientConn, err error) {
return n.DialClass(ctx, nodeID, rpc.DefaultClass)
}

// DialInternalClient is a specialization of DialClass for callers that
// want a roachpb.InternalClient. This supports an optimization to bypass the
// network for the local node. Returns a context.Context which should be used
Expand Down Expand Up @@ -149,7 +144,7 @@ func (n *Dialer) dial(
log.Infof(ctx, "unable to connect to n%d: %s", nodeID, err)
}
}()
conn, err := n.rpcContext.GRPCDialNodeClass(addr.String(), nodeID, class).Connect(ctx)
conn, err := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class).Connect(ctx)
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
Expand Down Expand Up @@ -178,10 +173,10 @@ func (n *Dialer) dial(
return conn, nil
}

// ConnHealthClass returns nil if we have an open connection of the request
// ConnHealth returns nil if we have an open connection of the request
// class to the given node that succeeded on its most recent heartbeat. See the
// method of the same name on rpc.Context for more details.
func (n *Dialer) ConnHealthClass(nodeID roachpb.NodeID, class rpc.ConnectionClass) error {
func (n *Dialer) ConnHealth(nodeID roachpb.NodeID, class rpc.ConnectionClass) error {
if n == nil || n.resolver == nil {
return errors.New("no node dialer configured")
}
Expand All @@ -198,25 +193,14 @@ func (n *Dialer) ConnHealthClass(nodeID roachpb.NodeID, class rpc.ConnectionClas
// The local client is always considered healthy.
return nil
}
conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID)
conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class)
return conn.Health()
}

// ConnHealth is shorthand for n.ConnHealthClass(nodeID, rpc.DefaultClass).
func (n *Dialer) ConnHealth(nodeID roachpb.NodeID) error {
return n.ConnHealthClass(nodeID, rpc.DefaultClass)
}

// GetCircuitBreaker is shorthand for
// n.GetCircuitBreakerClass(nodeID, rpc.DefaultClass).
func (n *Dialer) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker {
return n.GetCircuitBreakerClass(nodeID, rpc.DefaultClass)
}

// GetCircuitBreakerClass retrieves the circuit breaker for connections to the
// GetCircuitBreaker retrieves the circuit breaker for connections to the
// given node. The breaker should not be mutated as this affects all connections
// dialing to that node through this NodeDialer.
func (n *Dialer) GetCircuitBreakerClass(
func (n *Dialer) GetCircuitBreaker(
nodeID roachpb.NodeID, class rpc.ConnectionClass,
) *circuit.Breaker {
return n.getBreaker(nodeID, class).Breaker
Expand All @@ -236,11 +220,11 @@ func (n *Dialer) getBreaker(nodeID roachpb.NodeID, class rpc.ConnectionClass) *w
type dialerAdapter Dialer

func (da *dialerAdapter) Ready(nodeID roachpb.NodeID) bool {
return (*Dialer)(da).GetCircuitBreaker(nodeID).Ready()
return (*Dialer)(da).GetCircuitBreaker(nodeID, rpc.DefaultClass).Ready()
}

func (da *dialerAdapter) Dial(ctx context.Context, nodeID roachpb.NodeID) (ctpb.Client, error) {
c, err := (*Dialer)(da).Dial(ctx, nodeID)
c, err := (*Dialer)(da).Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return nil, err
}
Expand Down
Loading