Skip to content

Commit

Permalink
Merge pull request #31014 from petermattis/backport2.1-30987
Browse files Browse the repository at this point in the history
release-2.1: sql,rpc/nodedialer: improve distsql node health checks
  • Loading branch information
petermattis authored Oct 5, 2018
2 parents 041f6bd + dab62ae commit 00ee2a9
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 98 deletions.
15 changes: 13 additions & 2 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,19 @@ func (n *Dialer) DialInternalClient(
if err != nil {
return nil, nil, err
}
// TODO(bdarnell): Reconcile the different health checks and circuit
// breaker behavior in this file
// Check to see if the connection is in the transient failure state. This can
// happen if the connection already existed, but a recent heartbeat has
// failed and we haven't yet torn down the connection.
if err := grpcutil.ConnectionReady(conn); err != nil {
return nil, nil, err
}
// TODO(bdarnell): Reconcile the different health checks and circuit breaker
// behavior in this file. Note that this different behavior causes problems
// for higher-levels in the system. For example, DistSQL checks for
// ConnHealth when scheduling processors, but can then see attempts to send
// RPCs fail when dial fails due to an open breaker. Reset the breaker here
// as a stop-gap before the reconciliation occurs.
n.getBreaker(nodeID).Reset()
return ctx, roachpb.NewInternalClient(conn), nil
}

Expand All @@ -159,6 +167,9 @@ func (n *Dialer) ConnHealth(nodeID roachpb.NodeID) error {
if n == nil || n.resolver == nil {
return errors.New("no node dialer configured")
}
if !n.getBreaker(nodeID).Ready() {
return circuit.ErrBreakerOpen
}
addr, err := n.resolver(nodeID)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.gossip,
s.stopper,
s.nodeLiveness,
sqlExecutorTestingKnobs.DistSQLPlannerKnobs,
s.nodeDialer,
),

Expand Down
98 changes: 37 additions & 61 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ type DistSQLPlanner struct {
st *cluster.Settings
// The node descriptor for the gateway node that initiated this query.
nodeDesc roachpb.NodeDescriptor
rpcContext *rpc.Context
stopper *stop.Stopper
distSQLSrv *distsqlrun.ServerImpl
spanResolver distsqlplan.SpanResolver
testingKnobs DistSQLPlannerTestingKnobs

// metadataTestTolerance is the minimum level required to plan metadata test
// processors.
Expand All @@ -90,10 +88,12 @@ type DistSQLPlanner struct {

// gossip handle used to check node version compatibility.
gossip *gossip.Gossip
// liveness is used to avoid planning on down nodes.
liveness *storage.NodeLiveness

nodeDialer *nodedialer.Dialer

// nodeHealth encapsulates the various node health checks to avoid planning
// on unhealthy nodes.
nodeHealth distSQLNodeHealth
}

const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice
Expand Down Expand Up @@ -135,26 +135,36 @@ func NewDistSQLPlanner(
gossip *gossip.Gossip,
stopper *stop.Stopper,
liveness *storage.NodeLiveness,
testingKnobs DistSQLPlannerTestingKnobs,
nodeDialer *nodedialer.Dialer,
) *DistSQLPlanner {
if liveness == nil {
panic("must specify liveness")
}
dsp := &DistSQLPlanner{
planVersion: planVersion,
st: st,
nodeDesc: nodeDesc,
rpcContext: rpcCtx,
stopper: stopper,
distSQLSrv: distSQLSrv,
gossip: gossip,
spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy),
liveness: liveness,
testingKnobs: testingKnobs,
planVersion: planVersion,
st: st,
nodeDesc: nodeDesc,
stopper: stopper,
distSQLSrv: distSQLSrv,
spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy),
gossip: gossip,
nodeDialer: nodeDialer,
nodeHealth: distSQLNodeHealth{
gossip: gossip,
connHealth: nodeDialer.ConnHealth,
},
metadataTestTolerance: distsqlrun.NoExplain,
nodeDialer: nodeDialer,
}
// NB: not all tests populate a NodeLiveness. Everything using the
// proper constructor NewDistSQLPlanner will, though.
if liveness != nil {
dsp.nodeHealth.isLive = liveness.IsLive
} else {
dsp.nodeHealth.isLive = func(_ roachpb.NodeID) (bool, error) {
return true, nil
}
}

dsp.initRunners()
return dsp
}
Expand Down Expand Up @@ -588,40 +598,13 @@ type SpanPartition struct {
Spans roachpb.Spans
}

func (dsp *DistSQLPlanner) checkNodeHealth(
ctx context.Context, nodeID roachpb.NodeID, addr string,
) error {
// NB: not all tests populate a NodeLiveness. Everything using the
// proper constructor NewDistSQLPlanner will, though.
isLive := func(_ roachpb.NodeID) (bool, error) {
return true, nil
}
if dsp.liveness != nil {
isLive = dsp.liveness.IsLive
}
return checkNodeHealth(ctx, nodeID, addr, dsp.testingKnobs, dsp.gossip, dsp.rpcContext.ConnHealth, isLive)
type distSQLNodeHealth struct {
gossip *gossip.Gossip
connHealth func(roachpb.NodeID) error
isLive func(roachpb.NodeID) (bool, error)
}

func checkNodeHealth(
ctx context.Context,
nodeID roachpb.NodeID,
addr string,
knobs DistSQLPlannerTestingKnobs,
g *gossip.Gossip,
connHealth func(string) error,
isLive func(roachpb.NodeID) (bool, error),
) error {
// Check if the target's node descriptor is gossiped. If it isn't, the node
// is definitely gone and has been for a while.
//
// TODO(tschottdorf): it's not clear that this adds anything to the liveness
// check below. The node descriptor TTL is an hour as of 03/2018.
if _, err := g.GetNodeIDAddress(nodeID); err != nil {
log.VEventf(ctx, 1, "not using n%d because gossip doesn't know about it. "+
"It might have gone away from the cluster. Gossip said: %s.", nodeID, err)
return err
}

func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) error {
{
// NB: as of #22658, ConnHealth does not work as expected; see the
// comment within. We still keep this code for now because in
Expand All @@ -630,13 +613,7 @@ func checkNodeHealth(
// artifact of rpcContext's reconnection mechanism at the time of
// writing). This is better than having it used in 100% of cases
// (until the liveness check below kicks in).
var err error
if knobs.OverrideHealthCheck != nil {
err = knobs.OverrideHealthCheck(nodeID, addr)
} else {
err = connHealth(addr)
}

err := h.connHealth(nodeID)
if err != nil && err != rpc.ErrNotHeartbeated {
// This host is known to be unhealthy. Don't use it (use the gateway
// instead). Note: this can never happen for our nodeID (which
Expand All @@ -646,7 +623,7 @@ func checkNodeHealth(
}
}
{
live, err := isLive(nodeID)
live, err := h.isLive(nodeID)
if err == nil && !live {
err = errors.New("node is not live")
}
Expand All @@ -657,7 +634,7 @@ func checkNodeHealth(

// Check that the node is not draining.
drainingInfo := &distsqlrun.DistSQLDrainingInfo{}
if err := g.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil {
if err := h.gossip.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil {
// Because draining info has no expiration, an error
// implies that we have not yet received a node's
// draining information. Since this information is
Expand Down Expand Up @@ -754,7 +731,7 @@ func (dsp *DistSQLPlanner) PartitionSpans(
addr, inAddrMap := planCtx.NodeAddresses[nodeID]
if !inAddrMap {
addr = replInfo.NodeDesc.Address.String()
if err := dsp.checkNodeHealth(ctx, nodeID, addr); err != nil {
if err := dsp.nodeHealth.check(ctx, nodeID); err != nil {
addr = ""
}
if err == nil && addr != "" {
Expand Down Expand Up @@ -1021,15 +998,14 @@ func (dsp *DistSQLPlanner) CheckNodeHealthAndVersion(
planCtx *PlanningCtx, desc *roachpb.NodeDescriptor,
) error {
nodeID := desc.NodeID
addr := desc.Address.String()
var err error

if err = dsp.checkNodeHealth(planCtx.ctx, nodeID, addr); err != nil {
if err = dsp.nodeHealth.check(planCtx.ctx, nodeID); err != nil {
err = errors.New("unhealthy")
} else if !dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion) {
err = errors.New("incompatible version")
} else {
planCtx.NodeAddresses[nodeID] = addr
planCtx.NodeAddresses[nodeID] = desc.Address.String()
}
return err
}
Expand Down
57 changes: 35 additions & 22 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,19 @@ func TestPartitionSpans(t *testing.T) {
stopper: stopper,
spanResolver: tsp,
gossip: mockGossip,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
nodeHealth: distSQLNodeHealth{
gossip: mockGossip,
connHealth: func(node roachpb.NodeID) error {
for _, n := range tc.deadNodes {
if int(node) == n {
return fmt.Errorf("test node is unhealthy")
}
}
return nil
},
isLive: func(nodeID roachpb.NodeID) (bool, error) {
return true, nil
},
},
}

Expand Down Expand Up @@ -954,11 +958,15 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
stopper: stopper,
spanResolver: tsp,
gossip: mockGossip,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
nodeHealth: distSQLNodeHealth{
gossip: mockGossip,
connHealth: func(roachpb.NodeID) error {
// All the nodes are healthy.
return nil
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
},
},
}

Expand Down Expand Up @@ -1045,10 +1053,14 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
stopper: stopper,
spanResolver: tsp,
gossip: mockGossip,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
// All the nodes are healthy.
return nil
nodeHealth: distSQLNodeHealth{
gossip: mockGossip,
connHealth: func(node roachpb.NodeID) error {
_, err := mockGossip.GetNodeIDAddress(node)
return err
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
},
},
}
Expand Down Expand Up @@ -1119,10 +1131,10 @@ func TestCheckNodeHealth(t *testing.T) {
return true, nil
}

connHealthy := func(string) error {
connHealthy := func(roachpb.NodeID) error {
return nil
}
connUnhealthy := func(string) error {
connUnhealthy := func(roachpb.NodeID) error {
return errors.New("injected conn health error")
}
_ = connUnhealthy
Expand All @@ -1138,18 +1150,19 @@ func TestCheckNodeHealth(t *testing.T) {

for _, test := range livenessTests {
t.Run("liveness", func(t *testing.T) {
if err := checkNodeHealth(
context.Background(), nodeID, desc.Address.AddressField,
DistSQLPlannerTestingKnobs{}, /* knobs */
mockGossip, connHealthy, test.isLive,
); !testutils.IsError(err, test.exp) {
h := distSQLNodeHealth{
gossip: mockGossip,
connHealth: connHealthy,
isLive: test.isLive,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
}
})
}

connHealthTests := []struct {
connHealth func(string) error
connHealth func(roachpb.NodeID) error
exp string
}{
{connHealthy, ""},
Expand All @@ -1158,14 +1171,14 @@ func TestCheckNodeHealth(t *testing.T) {

for _, test := range connHealthTests {
t.Run("connHealth", func(t *testing.T) {
if err := checkNodeHealth(
context.Background(), nodeID, desc.Address.AddressField,
DistSQLPlannerTestingKnobs{}, /* knobs */
mockGossip, test.connHealth, live,
); !testutils.IsError(err, test.exp) {
h := distSQLNodeHealth{
gossip: mockGossip,
connHealth: test.connHealth,
isLive: live,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
}
})
}

}
12 changes: 0 additions & 12 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -359,9 +358,6 @@ type ExecutorTestingKnobs struct {
// execution so there'll be nothing left to abort by the time the filter runs.
DisableAutoCommit bool

// DistSQLPlannerKnobs are testing knobs for DistSQLPlanner.
DistSQLPlannerKnobs DistSQLPlannerTestingKnobs

// BeforeAutoCommit is called when the Executor is about to commit the KV
// transaction after running a statement in an implicit transaction, allowing
// tests to inject errors into that commit.
Expand All @@ -376,14 +372,6 @@ type ExecutorTestingKnobs struct {
BeforeAutoCommit func(ctx context.Context, stmt string) error
}

// DistSQLPlannerTestingKnobs is used to control internals of the DistSQLPlanner
// for testing purposes.
type DistSQLPlannerTestingKnobs struct {
// If OverrideSQLHealthCheck is set, we use this callback to get the health of
// a node.
OverrideHealthCheck func(node roachpb.NodeID, addrString string) error
}

// databaseCacheHolder is a thread-safe container for a *databaseCache.
// It also allows clients to block until the cache is updated to a desired
// state.
Expand Down

0 comments on commit 00ee2a9

Please sign in to comment.