Skip to content

Commit

Permalink
Add a Node health controller
Browse files Browse the repository at this point in the history
This will aggregate all HealthStatus objects owned by the Node and update the status of the Node with an overall health.
  • Loading branch information
mkeeler committed May 12, 2023
1 parent 32cde93 commit 7eb038c
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 0 deletions.
2 changes: 2 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom

func (s *Server) registerResources() {
catalog.RegisterTypes(s.typeRegistry)
catalog.RegisterControllers(s.controllerManager)

mesh.RegisterTypes(s.typeRegistry)
reaper.RegisterControllers(s.controllerManager)

Expand Down
8 changes: 8 additions & 0 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package catalog

import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
)

Expand Down Expand Up @@ -43,3 +45,9 @@ var (
func RegisterTypes(r resource.Registry) {
types.Register(r)
}

// RegisterControllers registers controllers for the catalog types with
// the given controller Manager.
func RegisterControllers(mgr *controller.Manager) {
controllers.Register(mgr)
}
120 changes: 120 additions & 0 deletions internal/catalog/internal/controllers/nodehealth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package nodehealth

import (
"context"
"fmt"

"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func NodeHealthController() controller.Controller {
return controller.ForType(types.NodeType).
WithWatch(types.HealthStatusType, controller.MapOwnerFiltered(types.NodeType)).
WithReconciler(&nodeHealthReconciler{})
}

type nodeHealthReconciler struct{}

func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// The runtime is passed by value so replacing it here for the remaineder of this
// reconciliation request processing will not affect future invocations.
rt.Logger = rt.Logger.With("resource-id", req.ID)

rt.Logger.Trace("reconciling node health")

// read the node
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
switch {
case status.Code(err) == codes.NotFound:
rt.Logger.Trace("node has been deleted")
return nil
case err != nil:
rt.Logger.Error("the resource service has returned an unexpected error", "error", err)
return err
}

res := rsp.Resource

health, err := getNodeHealth(ctx, rt, req.ID)
if err != nil {
rt.Logger.Error("failed to calculate the nodes health", "error", err)
return err
}

message := NodeHealthyMessage
statusState := pbresource.Condition_STATE_TRUE
if health != pbcatalog.Health_HEALTH_PASSING {
statusState = pbresource.Condition_STATE_FALSE
message = NodeUnhealthyMessage
}

newStatus := &pbresource.Status{
ObservedGeneration: res.Generation,
Conditions: []*pbresource.Condition{
{
Type: StatusConditionHealthy,
State: statusState,
Reason: health.String(),
Message: message,
},
},
}

if resource.EqualStatus(res.Status[StatusKey], newStatus, false) {
rt.Logger.Trace("resources node health status is unchanged", "health", health.String())
return nil
}

_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: res.Id,
Key: StatusKey,
Status: newStatus,
})

if err != nil {
rt.Logger.Error("error encountered when attempting to update the resources node health status", "error", err)
return err
}

rt.Logger.Trace("resources node health status was updated", "health", health.String())
return nil
}

func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{
Owner: nodeRef,
})

if err != nil {
return pbcatalog.Health_HEALTH_CRITICAL, err
}

health := pbcatalog.Health_HEALTH_PASSING

for _, res := range rsp.Resources {
if resource.EqualType(res.Id.Type, types.HealthStatusType) {
var hs pbcatalog.HealthStatus
if err := res.Data.UnmarshalTo(&hs); err != nil {
// This should be impossible as the resource service + type validations the
// catalog is performing will ensure that no data gets written where unmarshalling
// to this type will error.
return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("error unmarshalling health status data: %w", err)
}

if hs.Status > health {
health = hs.Status
}
}
}

return health, nil
}
Loading

0 comments on commit 7eb038c

Please sign in to comment.