Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,rpc/nodedialer: improve distsql node health checks #30987

Merged
merged 2 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,19 @@ func (n *Dialer) DialInternalClient(
if err != nil {
return nil, nil, err
}
// TODO(bdarnell): Reconcile the different health checks and circuit
// breaker behavior in this file
// Check to see if the connection is in the transient failure state. This can
// happen if the connection already existed, but a recent heartbeat has
// failed and we haven't yet torn down the connection.
if err := grpcutil.ConnectionReady(conn); err != nil {
return nil, nil, err
}
// TODO(bdarnell): Reconcile the different health checks and circuit breaker
// behavior in this file. Note that this different behavior causes problems
// for higher-levels in the system. For example, DistSQL checks for
// ConnHealth when scheduling processors, but can then see attempts to send
// RPCs fail when dial fails due to an open breaker. Reset the breaker here
// as a stop-gap before the reconciliation occurs.
n.getBreaker(nodeID).Reset()
return ctx, roachpb.NewInternalClient(conn), nil
}

Expand All @@ -159,6 +167,9 @@ func (n *Dialer) ConnHealth(nodeID roachpb.NodeID) error {
if n == nil || n.resolver == nil {
return errors.New("no node dialer configured")
}
if !n.getBreaker(nodeID).Ready() {
return circuit.ErrBreakerOpen
}
addr, err := n.resolver(nodeID)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.gossip,
s.stopper,
s.nodeLiveness,
sqlExecutorTestingKnobs.DistSQLPlannerKnobs,
s.nodeDialer,
),

Expand Down
98 changes: 37 additions & 61 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ type DistSQLPlanner struct {
st *cluster.Settings
// The node descriptor for the gateway node that initiated this query.
nodeDesc roachpb.NodeDescriptor
rpcContext *rpc.Context
stopper *stop.Stopper
distSQLSrv *distsqlrun.ServerImpl
spanResolver distsqlplan.SpanResolver
testingKnobs DistSQLPlannerTestingKnobs

// metadataTestTolerance is the minimum level required to plan metadata test
// processors.
Expand All @@ -90,10 +88,12 @@ type DistSQLPlanner struct {

// gossip handle used to check node version compatibility.
gossip *gossip.Gossip
// liveness is used to avoid planning on down nodes.
liveness *storage.NodeLiveness

nodeDialer *nodedialer.Dialer

// nodeHealth encapsulates the various node health checks to avoid planning
// on unhealthy nodes.
nodeHealth distSQLNodeHealth
}

const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice
Expand Down Expand Up @@ -135,26 +135,36 @@ func NewDistSQLPlanner(
gossip *gossip.Gossip,
stopper *stop.Stopper,
liveness *storage.NodeLiveness,
testingKnobs DistSQLPlannerTestingKnobs,
nodeDialer *nodedialer.Dialer,
) *DistSQLPlanner {
if liveness == nil {
panic("must specify liveness")
}
dsp := &DistSQLPlanner{
planVersion: planVersion,
st: st,
nodeDesc: nodeDesc,
rpcContext: rpcCtx,
stopper: stopper,
distSQLSrv: distSQLSrv,
gossip: gossip,
spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy),
liveness: liveness,
testingKnobs: testingKnobs,
planVersion: planVersion,
st: st,
nodeDesc: nodeDesc,
stopper: stopper,
distSQLSrv: distSQLSrv,
spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy),
gossip: gossip,
nodeDialer: nodeDialer,
nodeHealth: distSQLNodeHealth{
gossip: gossip,
connHealth: nodeDialer.ConnHealth,
},
metadataTestTolerance: distsqlrun.NoExplain,
nodeDialer: nodeDialer,
}
// NB: not all tests populate a NodeLiveness. Everything using the
// proper constructor NewDistSQLPlanner will, though.
if liveness != nil {
dsp.nodeHealth.isLive = liveness.IsLive
} else {
dsp.nodeHealth.isLive = func(_ roachpb.NodeID) (bool, error) {
return true, nil
}
}

dsp.initRunners()
return dsp
}
Expand Down Expand Up @@ -591,40 +601,13 @@ type SpanPartition struct {
Spans roachpb.Spans
}

func (dsp *DistSQLPlanner) checkNodeHealth(
ctx context.Context, nodeID roachpb.NodeID, addr string,
) error {
// NB: not all tests populate a NodeLiveness. Everything using the
// proper constructor NewDistSQLPlanner will, though.
isLive := func(_ roachpb.NodeID) (bool, error) {
return true, nil
}
if dsp.liveness != nil {
isLive = dsp.liveness.IsLive
}
return checkNodeHealth(ctx, nodeID, addr, dsp.testingKnobs, dsp.gossip, dsp.rpcContext.ConnHealth, isLive)
type distSQLNodeHealth struct {
gossip *gossip.Gossip
connHealth func(roachpb.NodeID) error
isLive func(roachpb.NodeID) (bool, error)
}

func checkNodeHealth(
ctx context.Context,
nodeID roachpb.NodeID,
addr string,
knobs DistSQLPlannerTestingKnobs,
g *gossip.Gossip,
connHealth func(string) error,
isLive func(roachpb.NodeID) (bool, error),
) error {
// Check if the target's node descriptor is gossiped. If it isn't, the node
// is definitely gone and has been for a while.
//
// TODO(tschottdorf): it's not clear that this adds anything to the liveness
// check below. The node descriptor TTL is an hour as of 03/2018.
if _, err := g.GetNodeIDAddress(nodeID); err != nil {
log.VEventf(ctx, 1, "not using n%d because gossip doesn't know about it. "+
"It might have gone away from the cluster. Gossip said: %s.", nodeID, err)
return err
}

func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) error {
{
// NB: as of #22658, ConnHealth does not work as expected; see the
// comment within. We still keep this code for now because in
Expand All @@ -633,13 +616,7 @@ func checkNodeHealth(
// artifact of rpcContext's reconnection mechanism at the time of
// writing). This is better than having it used in 100% of cases
// (until the liveness check below kicks in).
var err error
if knobs.OverrideHealthCheck != nil {
err = knobs.OverrideHealthCheck(nodeID, addr)
} else {
err = connHealth(addr)
}

err := h.connHealth(nodeID)
if err != nil && err != rpc.ErrNotHeartbeated {
// This host is known to be unhealthy. Don't use it (use the gateway
// instead). Note: this can never happen for our nodeID (which
Expand All @@ -649,7 +626,7 @@ func checkNodeHealth(
}
}
{
live, err := isLive(nodeID)
live, err := h.isLive(nodeID)
if err == nil && !live {
err = errors.New("node is not live")
}
Expand All @@ -660,7 +637,7 @@ func checkNodeHealth(

// Check that the node is not draining.
drainingInfo := &distsqlrun.DistSQLDrainingInfo{}
if err := g.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil {
if err := h.gossip.GetInfoProto(gossip.MakeDistSQLDrainingKey(nodeID), drainingInfo); err != nil {
// Because draining info has no expiration, an error
// implies that we have not yet received a node's
// draining information. Since this information is
Expand Down Expand Up @@ -757,7 +734,7 @@ func (dsp *DistSQLPlanner) PartitionSpans(
addr, inAddrMap := planCtx.NodeAddresses[nodeID]
if !inAddrMap {
addr = replInfo.NodeDesc.Address.String()
if err := dsp.checkNodeHealth(ctx, nodeID, addr); err != nil {
if err := dsp.nodeHealth.check(ctx, nodeID); err != nil {
addr = ""
}
if err == nil && addr != "" {
Expand Down Expand Up @@ -1028,15 +1005,14 @@ func (dsp *DistSQLPlanner) CheckNodeHealthAndVersion(
planCtx *PlanningCtx, desc *roachpb.NodeDescriptor,
) error {
nodeID := desc.NodeID
addr := desc.Address.String()
var err error

if err = dsp.checkNodeHealth(planCtx.ctx, nodeID, addr); err != nil {
if err = dsp.nodeHealth.check(planCtx.ctx, nodeID); err != nil {
err = errors.New("unhealthy")
} else if !dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion) {
err = errors.New("incompatible version")
} else {
planCtx.NodeAddresses[nodeID] = addr
planCtx.NodeAddresses[nodeID] = desc.Address.String()
}
return err
}
Expand Down
57 changes: 35 additions & 22 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,19 @@ func TestPartitionSpans(t *testing.T) {
stopper: stopper,
spanResolver: tsp,
gossip: mockGossip,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
nodeHealth: distSQLNodeHealth{
gossip: mockGossip,
connHealth: func(node roachpb.NodeID) error {
for _, n := range tc.deadNodes {
if int(node) == n {
return fmt.Errorf("test node is unhealthy")
}
}
return nil
},
isLive: func(nodeID roachpb.NodeID) (bool, error) {
return true, nil
},
},
}

Expand Down Expand Up @@ -954,11 +958,15 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
stopper: stopper,
spanResolver: tsp,
gossip: mockGossip,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
nodeHealth: distSQLNodeHealth{
gossip: mockGossip,
connHealth: func(roachpb.NodeID) error {
// All the nodes are healthy.
return nil
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
},
},
}

Expand Down Expand Up @@ -1045,10 +1053,14 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
stopper: stopper,
spanResolver: tsp,
gossip: mockGossip,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
// All the nodes are healthy.
return nil
nodeHealth: distSQLNodeHealth{
gossip: mockGossip,
connHealth: func(node roachpb.NodeID) error {
_, err := mockGossip.GetNodeIDAddress(node)
return err
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
},
},
}
Expand Down Expand Up @@ -1119,10 +1131,10 @@ func TestCheckNodeHealth(t *testing.T) {
return true, nil
}

connHealthy := func(string) error {
connHealthy := func(roachpb.NodeID) error {
return nil
}
connUnhealthy := func(string) error {
connUnhealthy := func(roachpb.NodeID) error {
return errors.New("injected conn health error")
}
_ = connUnhealthy
Expand All @@ -1138,18 +1150,19 @@ func TestCheckNodeHealth(t *testing.T) {

for _, test := range livenessTests {
t.Run("liveness", func(t *testing.T) {
if err := checkNodeHealth(
context.Background(), nodeID, desc.Address.AddressField,
DistSQLPlannerTestingKnobs{}, /* knobs */
mockGossip, connHealthy, test.isLive,
); !testutils.IsError(err, test.exp) {
h := distSQLNodeHealth{
gossip: mockGossip,
connHealth: connHealthy,
isLive: test.isLive,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
}
})
}

connHealthTests := []struct {
connHealth func(string) error
connHealth func(roachpb.NodeID) error
exp string
}{
{connHealthy, ""},
Expand All @@ -1158,14 +1171,14 @@ func TestCheckNodeHealth(t *testing.T) {

for _, test := range connHealthTests {
t.Run("connHealth", func(t *testing.T) {
if err := checkNodeHealth(
context.Background(), nodeID, desc.Address.AddressField,
DistSQLPlannerTestingKnobs{}, /* knobs */
mockGossip, test.connHealth, live,
); !testutils.IsError(err, test.exp) {
h := distSQLNodeHealth{
gossip: mockGossip,
connHealth: test.connHealth,
isLive: live,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
}
})
}

}
12 changes: 0 additions & 12 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -361,9 +360,6 @@ type ExecutorTestingKnobs struct {
// execution so there'll be nothing left to abort by the time the filter runs.
DisableAutoCommit bool

// DistSQLPlannerKnobs are testing knobs for DistSQLPlanner.
DistSQLPlannerKnobs DistSQLPlannerTestingKnobs

// BeforeAutoCommit is called when the Executor is about to commit the KV
// transaction after running a statement in an implicit transaction, allowing
// tests to inject errors into that commit.
Expand All @@ -378,14 +374,6 @@ type ExecutorTestingKnobs struct {
BeforeAutoCommit func(ctx context.Context, stmt string) error
}

// DistSQLPlannerTestingKnobs is used to control internals of the DistSQLPlanner
// for testing purposes.
type DistSQLPlannerTestingKnobs struct {
// If OverrideSQLHealthCheck is set, we use this callback to get the health of
// a node.
OverrideHealthCheck func(node roachpb.NodeID, addrString string) error
}

// databaseCacheHolder is a thread-safe container for a *databaseCache.
// It also allows clients to block until the cache is updated to a desired
// state.
Expand Down