Skip to content

Commit

Permalink
Add a Node health controller
Browse files Browse the repository at this point in the history
This will aggregate all HealthStatus objects owned by the Node and update the status of the Node with an overall health.
  • Loading branch information
mkeeler committed May 10, 2023
1 parent 8a6ebb9 commit 17ee0be
Show file tree
Hide file tree
Showing 9 changed files with 661 additions and 0 deletions.
2 changes: 2 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom

func (s *Server) registerResources() {
catalog.RegisterTypes(s.typeRegistry)
catalog.RegisterControllers(s.controllerManager)

mesh.RegisterTypes(s.typeRegistry)
reaper.RegisterControllers(s.controllerManager)

Expand Down
8 changes: 8 additions & 0 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package catalog

import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
)

Expand Down Expand Up @@ -43,3 +45,9 @@ var (
func RegisterTypes(r resource.Registry) {
types.Register(r)
}

// RegisterControllers registers controllers for the catalog types with
// the given controller Manager.
func RegisterControllers(mgr *controller.Manager) {
controllers.Register(mgr)
}
62 changes: 62 additions & 0 deletions internal/catalog/internal/controllers/ctltest/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ctltest

import (
"context"
"testing"

"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"
)

type resourceBuilder struct {
resource *pbresource.Resource
}

func Resource(rtype *pbresource.Type, name string) *resourceBuilder {
return &resourceBuilder{
resource: &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: rtype.Group,
GroupVersion: rtype.GroupVersion,
Kind: rtype.Kind,
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
Namespace: "default",
PeerName: "local",
},
Name: name,
},
},
}
}

func (b *resourceBuilder) WithData(t *testing.T, data protoreflect.ProtoMessage) *resourceBuilder {
anyData, err := anypb.New(data)
require.NoError(t, err)
b.resource.Data = anyData
return b
}

func (b *resourceBuilder) WithOwner(id *pbresource.ID) *resourceBuilder {
b.resource.Owner = id
return b
}

func (b *resourceBuilder) Build() *pbresource.Resource {
return b.resource
}

func (b *resourceBuilder) Write(t *testing.T, client pbresource.ResourceServiceClient) *pbresource.Resource {
res := b.Build()

rsp, err := client.Write(context.Background(), &pbresource.WriteRequest{
Resource: res,
})

require.NoError(t, err)
return rsp.Resource
}
104 changes: 104 additions & 0 deletions internal/catalog/internal/controllers/nodehealth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package nodehealth

import (
"context"
"fmt"

"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

func NodeHealthController() controller.Controller {
return controller.ForType(types.NodeType).
WithWatch(types.HealthStatusType, controller.MapOwnerFiltered(types.NodeType)).
WithReconciler(&nodeHealthReconciler{})
}

type nodeHealthReconciler struct{}

func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// read the workload
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
switch {
case status.Code(err) == codes.NotFound:
return nil
case err != nil:
return err
}

res := rsp.Resource

health, err := getNodeHealth(ctx, rt, req.ID)
if err != nil {
return err
}

message := NodeHealthyMessage
statusState := pbresource.Condition_STATE_TRUE
if health != pbcatalog.Health_HEALTH_PASSING {
statusState = pbresource.Condition_STATE_FALSE
message = NodeUnhealthyMessage
}

newStatus := &pbresource.Status{
ObservedGeneration: res.Generation,
Conditions: []*pbresource.Condition{
{
Type: StatusConditionHealthy,
State: statusState,
Reason: health.String(),
Message: message,
},
},
}

if proto.Equal(res.Status[StatusKey], newStatus) {
return nil
}

_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: res.Id,
Key: StatusKey,
Status: newStatus,
})

return err
}

func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{
Owner: nodeRef,
})

if err != nil {
return pbcatalog.Health_HEALTH_CRITICAL, err
}

health := pbcatalog.Health_HEALTH_PASSING

for _, res := range rsp.Resources {
if proto.Equal(res.Id.Type, types.HealthStatusType) {
var hs pbcatalog.HealthStatus
if err := res.Data.UnmarshalTo(&hs); err != nil {
// This should be impossible as the resource service + type validations the
// catalog is performing will ensure that no data gets written where unmarshalling
// to this type will error.
return health, fmt.Errorf("error unmarshalling health status data: %w", err)
}

if hs.Status > health {
health = hs.Status
}
}
}

return health, nil
}
Loading

0 comments on commit 17ee0be

Please sign in to comment.