Skip to content

Commit

Permalink
Add the workload health controller
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeler committed May 3, 2023
1 parent da5a915 commit 509271c
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 5 deletions.
2 changes: 1 addition & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom

func (s *Server) registerResources() {
catalog.RegisterTypes(s.typeRegistry)
catalog.RegisterControllers(s.controllerManager)
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())

mesh.RegisterTypes(s.typeRegistry)

Expand Down
13 changes: 11 additions & 2 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package catalog

import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
Expand Down Expand Up @@ -57,8 +58,16 @@ func RegisterTypes(r resource.Registry) {
types.Register(r)
}

type ControllerDependencies = controllers.Dependencies

func DefaultControllerDependencies() ControllerDependencies {
return ControllerDependencies{
WorkloadHealthNodeMapper: workloadhealth.DefaultNodeMapper(),
}
}

// RegisterControllers registers controllers for the catalog types with
// the given controller Manager.
func RegisterControllers(mgr *controller.Manager) {
controllers.Register(mgr)
func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) {
controllers.Register(mgr, deps)
}
10 changes: 8 additions & 2 deletions internal/catalog/internal/controllers/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
package controllers

import (
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
"github.com/hashicorp/consul/internal/controller"
)

func Register(mgr *controller.Manager) {
type Dependencies struct {
WorkloadHealthNodeMapper workloadhealth.NodeMapper
}

func Register(mgr *controller.Manager, deps Dependencies) {
mgr.Register(nodehealth.NodeHealthController())
mgr.Register(workloadhealth.WorkloadHealthController(deps.WorkloadHealthNodeMapper))
}
176 changes: 176 additions & 0 deletions internal/catalog/internal/controllers/workloadhealth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package workloadhealth

import (
"context"
"fmt"

"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/common"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/types"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

func WorkloadHealthController(nodeMap NodeMapper) controller.Controller {
if nodeMap == nil {
panic("No NodeMapper was provided to the WorkloadHealthController constructor")
}

return controller.ForType(types.WorkloadType).
WithWatch(types.HealthStatusType, common.MapOwnerFiltered(types.GroupName, types.CurrentVersion, types.WorkloadKind)).
WithWatch(types.NodeType, nodeMap.MapNodeToWorkloads).
WithReconciler(&workloadHealthReconciler{nodeMap: nodeMap})
}

type workloadHealthReconciler struct {
nodeMap NodeMapper
}

func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// read the workload
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
switch {
case status.Code(err) == codes.NotFound:
r.nodeMap.RemoveWorkloadTracking(req.ID)
return nil
case err != nil:
return err
}

res := rsp.Resource
var workload pbcatalog.Workload
if err := res.Data.UnmarshalTo(&workload); err != nil {
return err
}

nodeHealth := pbcatalog.Health_HEALTH_PASSING
if workload.NodeName != "" {
nodeID := r.nodeMap.NodeIDFromWorkload(res, &workload)
r.nodeMap.TrackWorkload(res.Id, nodeID)
nodeHealth, err = r.getNodeHealth(ctx, rt, nodeID)
if err != nil {
return err
}
}

workloadHealth, err := r.getWorkloadHealth(ctx, rt, req.ID)
if err != nil {
return err
}

health := nodeHealth
if workloadHealth > health {
health = workloadHealth
}

statusState := pbresource.Condition_STATE_TRUE
if health != pbcatalog.Health_HEALTH_PASSING {
statusState = pbresource.Condition_STATE_FALSE
}

message := WorkloadHealthyMessage
if workload.NodeName != "" {
message = NodeAndWorkloadHealthyMessage
}
switch {
case workloadHealth != pbcatalog.Health_HEALTH_PASSING && nodeHealth != pbcatalog.Health_HEALTH_PASSING:
message = NodeAndWorkloadUnhealthyMessage
case workloadHealth != pbcatalog.Health_HEALTH_PASSING:
message = WorkloadUnhealthyMessage
case nodeHealth != pbcatalog.Health_HEALTH_PASSING:
message = nodehealth.NodeUnhealthyMessage
}

newStatus := &pbresource.Status{
ObservedGeneration: res.Generation,
Conditions: []*pbresource.Condition{
{
Type: StatusConditionHealthy,
State: statusState,
Reason: health.String(),
Message: message,
},
},
}

if proto.Equal(res.Status[StatusKey], newStatus) {
return nil
}

_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: res.Id,
Key: StatusKey,
Status: newStatus,
})

return err
}

func (r *workloadHealthReconciler) getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: nodeRef})
switch {
case status.Code(err) == codes.NotFound:
return pbcatalog.Health_HEALTH_CRITICAL, nil
case err != nil:
return pbcatalog.Health_HEALTH_CRITICAL, err
default:
healthStatus, ok := rsp.Resource.Status[nodehealth.StatusKey]
if !ok {
// The Nodes health has never been reconciled and therefore the
// workloads health cannot be determined. Returning nil is acceptable
// because the controller should sometime soon run reconciliation for
// the node which will then trigger rereconciliation of this workload
return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("Node health has not been reconciled yet")
}

for _, condition := range healthStatus.Conditions {
if condition.Type == nodehealth.StatusConditionHealthy {
if condition.State == pbresource.Condition_STATE_TRUE {
return pbcatalog.Health_HEALTH_PASSING, nil
}

healthReason, valid := pbcatalog.Health_value[condition.Reason]
if !valid {
// The Nodes health is unknown - presumably the node health controller
// will come along and fix that up momentarily causing this workload
// reconciliation to occur again.
return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("Node health has invalid status")
}
return pbcatalog.Health(healthReason), nil
}
}
return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("Node health has invalid status")
}
}

func (r *workloadHealthReconciler) getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.List(ctx, &pbresource.ListRequest{
Type: types.HealthStatusType,
Tenancy: workloadRef.Tenancy,
})

if err != nil {
return pbcatalog.Health_HEALTH_CRITICAL, err
}

workloadHealth := pbcatalog.Health_HEALTH_PASSING

for _, res := range rsp.Resources {
if proto.Equal(res.Owner, workloadRef) {
var hs pbcatalog.HealthStatus
if err := res.Data.UnmarshalTo(&hs); err != nil {
return workloadHealth, fmt.Errorf("error unmarshalling health status data: %w", err)
}

if hs.Status > workloadHealth {
workloadHealth = hs.Status
}
}
}

return workloadHealth, nil
}
115 changes: 115 additions & 0 deletions internal/catalog/internal/controllers/workloadhealth/node_mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package workloadhealth

import (
"context"
"sync"

"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/protobuf/proto"
)

// The NodeMapper interface is used to provide an implementation around being able to
// map a watch event for a Node resource and translate it to reconciliation requests
// for all Workloads assigned to that node.
type NodeMapper interface {
// MapNodeToWorkloads will take a Node resource and return controller requests
// for all Workloads associated with the Node.
MapNodeToWorkloads(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error)

// TrackWorkload instructs the NodeMapper to associate the given workload
// ID with the given node ID.
TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID)

RemoveWorkloadTracking(workloadID *pbresource.ID)

NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID
}

type nodeMapper struct {
lock sync.Mutex
nodesToWorkloads map[string][]controller.Request
}

func DefaultNodeMapper() NodeMapper {
return &nodeMapper{
nodesToWorkloads: make(map[string][]controller.Request),
}
}

func (m *nodeMapper) NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID {
return &pbresource.ID{
Type: types.NodeV1Alpha1Type,
Tenancy: workload.Id.Tenancy,
Name: workloadData.NodeName,
}
}

func (m *nodeMapper) MapNodeToWorkloads(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
m.lock.Lock()
defer m.lock.Unlock()
return m.nodesToWorkloads[res.Id.Name], nil
}

func (m *nodeMapper) TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) {
m.lock.Lock()
defer m.lock.Unlock()

reqs, ok := m.nodesToWorkloads[nodeID.Name]
if ok {
for _, req := range reqs {
// if the workload already is mapped to the node
if proto.Equal(req.ID, workloadID) {
return
}
}
}

// Check if this workload is being tracked for another node and
// remove the link. This would only occur if the workloads
// associated node name is changed.
m.removeWorkloadTrackingLocked(workloadID)

// Now set up the latest tracking
m.nodesToWorkloads[nodeID.Name] = append(reqs, controller.Request{ID: workloadID})
}

func (m *nodeMapper) RemoveWorkloadTracking(workloadID *pbresource.ID) {
m.lock.Lock()
defer m.lock.Unlock()
m.removeWorkloadTrackingLocked(workloadID)
}

func (m *nodeMapper) removeWorkloadTrackingLocked(workloadID *pbresource.ID) {
// TODO make this not perform in O(<num global workloads>) time
for existingNodeName, workloads := range m.nodesToWorkloads {
foundIdx := -1
for idx, req := range workloads {
// TODO - maybe don't use proto.Equal because it drops to reflection
// for zero gain here
if proto.Equal(req.ID, workloadID) {
foundIdx = idx
break
}
}

// We found the Workload tracked by another node name. This means
// that the Workloads node association is being changed so first
// we must remove the previous association.
if foundIdx != -1 {
l := len(m.nodesToWorkloads[existingNodeName])

if l == 1 {
delete(m.nodesToWorkloads, existingNodeName)
} else if foundIdx == l-1 {
m.nodesToWorkloads[existingNodeName] = workloads[:foundIdx]
} else if foundIdx == 0 {
m.nodesToWorkloads[existingNodeName] = workloads[1:]
} else {
m.nodesToWorkloads[existingNodeName] = append(workloads[:foundIdx], workloads[foundIdx+1:]...)
}
}
}
}
11 changes: 11 additions & 0 deletions internal/catalog/internal/controllers/workloadhealth/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package workloadhealth

const (
StatusKey = "consul.io/workload-health"
StatusConditionHealthy = "healthy"

NodeAndWorkloadHealthyMessage = "All workload and associated node health checks are passing"
WorkloadHealthyMessage = "All workload health checks are passing"
NodeAndWorkloadUnhealthyMessage = "One or more workload and node health checks are not passing"
WorkloadUnhealthyMessage = "One or more workload health checks are not passing"
)

0 comments on commit 509271c

Please sign in to comment.