Skip to content

Commit

Permalink
Add the workload health controller
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeler committed May 10, 2023
1 parent 8d05b6c commit a5f7ec5
Show file tree
Hide file tree
Showing 8 changed files with 1,264 additions and 4 deletions.
2 changes: 1 addition & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom

func (s *Server) registerResources() {
catalog.RegisterTypes(s.typeRegistry)
catalog.RegisterControllers(s.controllerManager)
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())

mesh.RegisterTypes(s.typeRegistry)
reaper.RegisterControllers(s.controllerManager)
Expand Down
13 changes: 11 additions & 2 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package catalog

import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
Expand Down Expand Up @@ -46,8 +47,16 @@ func RegisterTypes(r resource.Registry) {
types.Register(r)
}

type ControllerDependencies = controllers.Dependencies

func DefaultControllerDependencies() ControllerDependencies {
return ControllerDependencies{
WorkloadHealthNodeMapper: nodemapper.New(),
}
}

// RegisterControllers registers controllers for the catalog types with
// the given controller Manager.
func RegisterControllers(mgr *controller.Manager) {
controllers.Register(mgr)
func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) {
controllers.Register(mgr, deps)
}
8 changes: 7 additions & 1 deletion internal/catalog/internal/controllers/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ package controllers

import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
"github.com/hashicorp/consul/internal/controller"
)

func Register(mgr *controller.Manager) {
type Dependencies struct {
WorkloadHealthNodeMapper workloadhealth.NodeMapper
}

func Register(mgr *controller.Manager, deps Dependencies) {
mgr.Register(nodehealth.NodeHealthController())
mgr.Register(workloadhealth.WorkloadHealthController(deps.WorkloadHealthNodeMapper))
}
211 changes: 211 additions & 0 deletions internal/catalog/internal/controllers/workloadhealth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package workloadhealth

import (
"context"
"errors"
"fmt"

"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

var (
errNodeUnreconciled = errors.New("Node health has not been reconciled yet")
errNodeHealthInvalid = errors.New("Node health has invalid reason")
errNodeHealthConditionNotFound = fmt.Errorf("Node health status is missing the %s condition", nodehealth.StatusConditionHealthy)
)

// The NodeMapper interface is used to provide an implementation around being able to
// map a watch event for a Node resource and translate it to reconciliation requests
// for all Workloads assigned to that node.
type NodeMapper interface {
// MapNodeToWorkloads will take a Node resource and return controller requests
// for all Workloads associated with the Node.
MapNodeToWorkloads(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error)

// TrackWorkload instructs the NodeMapper to associate the given workload
// ID with the given node ID.
TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID)

RemoveWorkloadTracking(workloadID *pbresource.ID)

NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID
}

func WorkloadHealthController(nodeMap NodeMapper) controller.Controller {
if nodeMap == nil {
panic("No NodeMapper was provided to the WorkloadHealthController constructor")
}

return controller.ForType(types.WorkloadType).
WithWatch(types.HealthStatusType, controller.MapOwnerFiltered(types.WorkloadType)).
WithWatch(types.NodeType, nodeMap.MapNodeToWorkloads).
WithReconciler(&workloadHealthReconciler{nodeMap: nodeMap})
}

type workloadHealthReconciler struct {
nodeMap NodeMapper
}

func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// read the workload
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
switch {
case status.Code(err) == codes.NotFound:
r.nodeMap.RemoveWorkloadTracking(req.ID)
return nil
case err != nil:
return err
}

res := rsp.Resource
var workload pbcatalog.Workload
if err := res.Data.UnmarshalTo(&workload); err != nil {
// This should be impossible and will not be exercised in tests. Various
// type validations on admission ensure that all Workloads would
// be unmarshallable in this way.
return err
}

nodeHealth := pbcatalog.Health_HEALTH_PASSING
if workload.NodeName != "" {
nodeID := r.nodeMap.NodeIDFromWorkload(res, &workload)
r.nodeMap.TrackWorkload(res.Id, nodeID)
nodeHealth, err = getNodeHealth(ctx, rt, nodeID)
if err != nil {
return err
}
} else {
// the node association may be been removed so stop tracking it.
r.nodeMap.RemoveWorkloadTracking(res.Id)
}

workloadHealth, err := getWorkloadHealth(ctx, rt, req.ID)
if err != nil {
// This should be impossible under normal operations and will not be exercised
// within the unit tests. This can only fail if the resource service fails
// or allows admission of invalid health statuses.
return err
}

health := nodeHealth
if workloadHealth > health {
health = workloadHealth
}

statusState := pbresource.Condition_STATE_TRUE
if health != pbcatalog.Health_HEALTH_PASSING {
statusState = pbresource.Condition_STATE_FALSE
}

message := WorkloadHealthyMessage
if workload.NodeName != "" {
message = NodeAndWorkloadHealthyMessage
}
switch {
case workloadHealth != pbcatalog.Health_HEALTH_PASSING && nodeHealth != pbcatalog.Health_HEALTH_PASSING:
message = NodeAndWorkloadUnhealthyMessage
case workloadHealth != pbcatalog.Health_HEALTH_PASSING:
message = WorkloadUnhealthyMessage
case nodeHealth != pbcatalog.Health_HEALTH_PASSING:
message = nodehealth.NodeUnhealthyMessage
}

newStatus := &pbresource.Status{
ObservedGeneration: res.Generation,
Conditions: []*pbresource.Condition{
{
Type: StatusConditionHealthy,
State: statusState,
Reason: health.String(),
Message: message,
},
},
}

if resource.EqualStatus(res.Status[StatusKey], newStatus, false) {
return nil
}

_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: res.Id,
Key: StatusKey,
Status: newStatus,
})

return err
}

func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: nodeRef})
switch {
case status.Code(err) == codes.NotFound:
return pbcatalog.Health_HEALTH_CRITICAL, nil
case err != nil:
return pbcatalog.Health_HEALTH_CRITICAL, err
default:
healthStatus, ok := rsp.Resource.Status[nodehealth.StatusKey]
if !ok {
// The Nodes health has never been reconciled and therefore the
// workloads health cannot be determined. Returning nil is acceptable
// because the controller should sometime soon run reconciliation for
// the node which will then trigger rereconciliation of this workload
return pbcatalog.Health_HEALTH_CRITICAL, errNodeUnreconciled
}

for _, condition := range healthStatus.Conditions {
if condition.Type == nodehealth.StatusConditionHealthy {
if condition.State == pbresource.Condition_STATE_TRUE {
return pbcatalog.Health_HEALTH_PASSING, nil
}

healthReason, valid := pbcatalog.Health_value[condition.Reason]
if !valid {
// The Nodes health is unknown - presumably the node health controller
// will come along and fix that up momentarily causing this workload
// reconciliation to occur again.
return pbcatalog.Health_HEALTH_CRITICAL, errNodeHealthInvalid
}
return pbcatalog.Health(healthReason), nil
}
}
return pbcatalog.Health_HEALTH_CRITICAL, errNodeHealthConditionNotFound
}
}

func getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{
Owner: workloadRef,
})

if err != nil {
return pbcatalog.Health_HEALTH_CRITICAL, err
}

workloadHealth := pbcatalog.Health_HEALTH_PASSING

for _, res := range rsp.Resources {
if proto.Equal(res.Id.Type, types.HealthStatusType) {
var hs pbcatalog.HealthStatus
if err := res.Data.UnmarshalTo(&hs); err != nil {
// This should be impossible and will not be executing in tests. The resource type
// is the HealthStatus type and therefore must be unmarshallable into the HealthStatus
// object or else it wouldn't have passed admission validation checks.
return workloadHealth, fmt.Errorf("error unmarshalling health status data: %w", err)
}

if hs.Status > workloadHealth {
workloadHealth = hs.Status
}
}
}

return workloadHealth, nil
}
Loading

0 comments on commit a5f7ec5

Please sign in to comment.