Skip to content

Commit

Permalink
add status update
Browse files Browse the repository at this point in the history
Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>
  • Loading branch information
thibaultmg committed Jul 8, 2024
1 parent a3a360f commit 516969b
Show file tree
Hide file tree
Showing 5 changed files with 795 additions and 393 deletions.
276 changes: 256 additions & 20 deletions operators/endpointmetrics/controllers/status/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,83 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/go-logr/logr"
"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/status"
"github.com/stolostron/multicluster-observability-operator/operators/endpointmetrics/pkg/util"
oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1"
)

// conditionType represents the standard conditions expected by ACM in the ObservabilityAddon status.
type conditionType string

const (
Available conditionType = "Available"
Progressing conditionType = "Progressing"
Degraded conditionType = "Degraded"
)

var (
// componentsMap contains the types of conditions (from individual components) that must be aggregated into standard conditions.
componentsMap = map[string]struct{}{
string(status.MetricsCollector): {},
string(status.UwlMetricsCollector): {},
}
)

// reason maps individual component reasons to standard types and assigns a priority to each reason.
// The priority is used to aggregate the conditions of the components into a single condition.
type reason struct {
reason string
priority int
stdType conditionType
}

func newReason(s string) reason {
switch s {
case string(status.ForwardSuccessful):
return reason{string(status.ForwardSuccessful), 1, Available}
case string(status.UpdateSuccessful):
return reason{string(status.UpdateSuccessful), 2, Progressing}
case string(status.ForwardFailed):
return reason{string(status.ForwardFailed), 3, Degraded}
case string(status.UpdateFailed):
return reason{string(status.UpdateFailed), 4, Degraded}
case string(status.Disabled):
return reason{string(status.Disabled), 5, Degraded}
case string(status.NotSupported):
return reason{string(status.NotSupported), 6, Degraded}
default:
return reason{s, -1, Degraded}
}
}

func (r reason) String() string {
return string(r.reason)
}

func (r reason) Priority() int {
return r.priority
}

func (r reason) StdType() conditionType {
return r.stdType
}

// StatusReconciler reconciles status object.
type StatusReconciler struct {
Client client.Client
Expand All @@ -35,13 +96,66 @@ type StatusReconciler struct {
Logger logr.Logger
}

// Reconcile reads that state of the cluster for a ObservabilityAddon object and makes changes based on the state read
// and what is in the ObservabilityAddon.Status
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
// Reconcile reads the status' conditions of ObservabilityAddon, aggregates the individual component conditions
// into standard conditions, and updates the status in the local and hub clusters.
// It returns:
// - a TerminalError if the reconciliation fails and no requeue is needed
// - a non terminal error if the reconciliation fails and a requeue is needed
// - a result.RequeueAfter if the reconciliation fails and a requeue with delay is needed
func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Logger.WithValues("Request", req.String()).Info("Reconciling")

if res, err := r.updateSpokeAddon(ctx); err != nil {
return res, err
} else if !res.IsZero() {
return res, nil
}

if res, err := r.updateHubAddon(ctx); err != nil {
return res, err
} else if !res.IsZero() {
return res, nil
}

return ctrl.Result{}, nil
}

func (s *StatusReconciler) updateSpokeAddon(ctx context.Context) (ctrl.Result, error) {
retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Fetch the ObservabilityAddon instance in local cluster
obsAddon := &oav1beta1.ObservabilityAddon{}
if err := s.Client.Get(ctx, types.NamespacedName{Name: s.ObsAddonName, Namespace: s.Namespace}, obsAddon); err != nil {
return err
}

addonNewCondition := aggregateComponentsConditions(obsAddon.Status.Conditions)
if addonNewCondition == nil {
return nil
}

if !shouldUpdateConditions(obsAddon.Status.Conditions, *addonNewCondition) {
return nil
}

obsAddon.Status.Conditions = resetMainConditionsStatus(obsAddon.Status.Conditions)
obsAddon.Status.Conditions = mutateOrAppend(obsAddon.Status.Conditions, *addonNewCondition)

s.Logger.Info(fmt.Sprintf("Updating status of ObservabilityAddon %s/%s", obsAddon.Namespace, obsAddon.Name), "type", addonNewCondition.Type, "reason", addonNewCondition.Reason)

return s.Client.Status().Update(ctx, obsAddon)
})

if retryErr != nil {
if errors.IsConflict(retryErr) || isTransientErr(retryErr) {
return s.requeueWithOptionalDelay(fmt.Errorf("failed to update status in spoke cluster with retryable error: %w", retryErr))
}
return ctrl.Result{}, reconcile.TerminalError(retryErr)
}

return ctrl.Result{}, nil
}

func (r *StatusReconciler) updateHubAddon(ctx context.Context) (ctrl.Result, error) {
// Fetch the ObservabilityAddon instance in hub cluster
hubObsAddon := &oav1beta1.ObservabilityAddon{}
err := r.HubClient.Get(ctx, types.NamespacedName{Name: r.ObsAddonName, Namespace: r.HubNamespace}, hubObsAddon)
Expand All @@ -50,18 +164,16 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// Try reloading the kubeconfig for the hub cluster
var reloadErr error
if r.HubClient, reloadErr = r.HubClient.Reload(); reloadErr != nil {
return ctrl.Result{}, fmt.Errorf("failed to reload the hub client: %w", reloadErr)
return ctrl.Result{}, reconcile.TerminalError(fmt.Errorf("failed to reload the hub client: %w", reloadErr))
}
r.Logger.Info("Failed to get ObservabilityAddon in hub cluster, reloaded hub, requeue with delay", "error", err)
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{}, fmt.Errorf("failed to get ObservabilityAddon in hub cluster, reloaded hub client: %w", err)
}

if isTransientErr(err) {
r.Logger.Info("Failed to get ObservabilityAddon in hub cluster, requeue with delay", "error", err)
return requeueWithOptionalDelay(err), nil
return r.requeueWithOptionalDelay(fmt.Errorf("failed to get ObservabilityAddon in hub cluster with retryable error: %w", err))
}

return ctrl.Result{}, err
return ctrl.Result{}, reconcile.TerminalError(fmt.Errorf("failed to get ObservabilityAddon in hub cluster: %w", err))
}

// Retry on conflict as operation happens in other cluster
Expand All @@ -86,16 +198,27 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
})
if retryErr != nil {
if isTransientErr(retryErr) || errors.IsConflict(retryErr) {
r.Logger.Info("Retryable error while updating status, request will be retried.", "error", retryErr)
return requeueWithOptionalDelay(retryErr), nil
return r.requeueWithOptionalDelay(fmt.Errorf("failed to update status in hub cluster with retryable error: %w", retryErr))
}

return ctrl.Result{}, fmt.Errorf("failed to update status in hub cluster: %w", retryErr)
return ctrl.Result{}, reconcile.TerminalError(fmt.Errorf("failed to update status in hub cluster: %w", retryErr))
}

return ctrl.Result{}, nil
}

// requeueWithOptionalDelay requeues the request with a delay if suggested by the error
// Otherwise, it requeues the request without a delay by returning an error
// The runtime will requeue the request without a delay if the error is non-nil
func (r *StatusReconciler) requeueWithOptionalDelay(err error) (ctrl.Result, error) {
if delay, ok := errors.SuggestsClientDelay(err); ok {
r.Logger.Info("Requeue with delay", "error", err, "delay", delay)
return ctrl.Result{RequeueAfter: time.Duration(delay) * time.Second}, nil
}

return ctrl.Result{}, err
}

// SetupWithManager sets up the controller with the Manager.
func (r *StatusReconciler) SetupWithManager(mgr ctrl.Manager) error {
pred := predicate.Funcs{
Expand Down Expand Up @@ -151,12 +274,125 @@ func isAuthOrConnectionErr(err error) bool {
return false
}

// requeueWithOptionalDelay requeues the request with a delay if suggested by the error
// Otherwise, it requeues the request without a delay
func requeueWithOptionalDelay(err error) ctrl.Result {
if delay, ok := errors.SuggestsClientDelay(err); ok {
return ctrl.Result{RequeueAfter: time.Duration(delay) * time.Second}
// mutateOrAppend updates the status conditions with the new condition.
// If the condition already exists, it updates it with the new condition.
// If the condition does not exist, it appends the new condition to the status conditions.
func mutateOrAppend(conditions []oav1beta1.StatusCondition, newCondition oav1beta1.StatusCondition) []oav1beta1.StatusCondition {
if len(conditions) == 0 {
return []oav1beta1.StatusCondition{newCondition}
}

for i, condition := range conditions {
if condition.Type == newCondition.Type {
// Update the existing condition
conditions[i] = newCondition
return conditions
}
}
// If the condition type does not exist, append the new condition
return append(conditions, newCondition)
}

// shouldAppendCondition checks if the new condition should be appended to the status conditions
// based on the last condition in the slice.
func shouldUpdateConditions(conditions []oav1beta1.StatusCondition, newCondition oav1beta1.StatusCondition) bool {
filteredConditions := []oav1beta1.StatusCondition{}
validTypes := map[string]struct{}{
string(Available): {},
string(Progressing): {},
string(Degraded): {},
}
for _, condition := range conditions {
if _, ok := validTypes[condition.Type]; ok {
filteredConditions = append(filteredConditions, condition)
}
}

if len(filteredConditions) == 0 {
return true
}

sort.Slice(filteredConditions, func(i, j int) bool {
if filteredConditions[i].Status == metav1.ConditionFalse && filteredConditions[j].Status == metav1.ConditionTrue {
return true
}
return filteredConditions[i].LastTransitionTime.Before(&filteredConditions[j].LastTransitionTime)
})

lastCondition := filteredConditions[len(filteredConditions)-1]

return lastCondition.Type != newCondition.Type ||
lastCondition.Status != newCondition.Status ||
lastCondition.Reason != newCondition.Reason ||
lastCondition.Message != newCondition.Message
}

return ctrl.Result{Requeue: true}
// aggregateComponentsConditions aggregates the conditions of the components into a single condition
// the condition type and reason are set based on the priority of the reasons of the components
// the m
func aggregateComponentsConditions(conditions []oav1beta1.StatusCondition) *oav1beta1.StatusCondition {
// Filter out standard conditions
filteredConditions := []oav1beta1.StatusCondition{}
for _, condition := range conditions {
if _, ok := componentsMap[condition.Type]; ok {
filteredConditions = append(filteredConditions, condition)
}
}

if len(filteredConditions) == 0 {
return nil
}

// Sort the conditions by decreasing priority of the reason
// If same priority, order by the type of the condition
sort.Slice(filteredConditions, func(i, j int) bool {
if newReason(filteredConditions[i].Reason).Priority() == newReason(filteredConditions[j].Reason).Priority() {
return filteredConditions[i].Type < filteredConditions[j].Type
}
return newReason(filteredConditions[i].Reason).Priority() > newReason(filteredConditions[j].Reason).Priority()
})

// Aggregate the conditions based on the priority of the reason
aggregatedCondition := &oav1beta1.StatusCondition{
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}

for _, condition := range filteredConditions {
// Keep the highest priority reason
if condition.Status != metav1.ConditionTrue {
aggregatedCondition.Status = metav1.ConditionFalse
}

if newReason(aggregatedCondition.Reason).Priority() < newReason(condition.Reason).Priority() {
aggregatedCondition.Reason = condition.Reason
aggregatedCondition.Type = string(newReason(condition.Reason).StdType())
}

message := fmt.Sprintf("%s: %s", condition.Type, condition.Message)

// Aggregate the messages
if aggregatedCondition.Message == "" {
aggregatedCondition.Message = message
} else {
aggregatedCondition.Message = strings.Join([]string{aggregatedCondition.Message, message}, "; ")
}
}

// truncate the message if it exceeds the limit
limit := 256
if len(aggregatedCondition.Message) > limit {
aggregatedCondition.Message = aggregatedCondition.Message[:limit-3] + "..."
}

return aggregatedCondition
}

func resetMainConditionsStatus(conditions []oav1beta1.StatusCondition) []oav1beta1.StatusCondition {
for i := range conditions {
if conditions[i].Type == string(Available) || conditions[i].Type == string(Degraded) || conditions[i].Type == string(Progressing) {
conditions[i].Status = metav1.ConditionFalse
}
}
return conditions
}
Loading

0 comments on commit 516969b

Please sign in to comment.