Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACM-11757: refactor status update management to avoid flapping status #1526

Merged
merged 13 commits into from
Aug 21, 2024
57 changes: 29 additions & 28 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/metricsclient"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/simulator"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/status"
statuslib "github.com/stolostron/multicluster-observability-operator/operators/pkg/status"
)

const (
failedStatusReportMsg = "Failed to report status"
uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092"
)

type RuleMatcher interface {
Expand Down Expand Up @@ -98,7 +100,8 @@ type Worker struct {

status status.StatusReport

metrics *workerMetrics
metrics *workerMetrics
forwardFailures int
}

func CreateFromClient(cfg Config, metrics *workerMetrics, interval time.Duration, name string,
Expand Down Expand Up @@ -297,7 +300,9 @@ func New(cfg Config) (*Worker, error) {
}
w.recordingRules = recordingRules

s, err := status.New(logger)
standalone := os.Getenv("STANDALONE") == "true"
isUwl := strings.Contains(os.Getenv("FROM"), uwlPromURL)
s, err := status.New(logger, standalone, isUwl)
if err != nil {
return nil, fmt.Errorf("unable to create StatusReport: %w", err)
}
Expand Down Expand Up @@ -366,6 +371,21 @@ func (w *Worker) forward(ctx context.Context) error {
w.lock.Lock()
defer w.lock.Unlock()

updateStatus := func(reason statuslib.Reason, message string) {
if reason == statuslib.ForwardFailed {
w.forwardFailures += 1
if w.forwardFailures < 3 {
return
}
}

w.forwardFailures = 0

if err := w.status.UpdateStatus(ctx, reason, message); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
}

var families []*clientmodel.MetricFamily
var err error
if w.simulatedTimeseriesFile != "" {
Expand All @@ -378,19 +398,13 @@ func (w *Worker) forward(ctx context.Context) error {
} else {
families, err = w.getFederateMetrics(ctx)
if err != nil {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to retrieve metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to retrieve metrics")
return err
}

rfamilies, err := w.getRecordingMetrics(ctx)
if err != nil && len(rfamilies) == 0 {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to retrieve recording metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to retrieve recording metrics")
return err
} else {
families = append(families, rfamilies...)
Expand All @@ -399,10 +413,7 @@ func (w *Worker) forward(ctx context.Context) error {

before := metricfamily.MetricsCount(families)
if err := metricfamily.Filter(families, w.transformer); err != nil {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to filter metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to filter metrics")
return err
}

Expand All @@ -416,34 +427,24 @@ func (w *Worker) forward(ctx context.Context) error {

if len(families) == 0 {
rlogger.Log(w.logger, rlogger.Warn, "msg", "no metrics to send, doing nothing")
statusErr := w.status.UpdateStatus(ctx, "Available", "No metrics to send")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardSuccessful, "No metrics to send")
return nil
}

if w.to == nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", "to is nil, doing nothing")
statusErr := w.status.UpdateStatus(ctx, "Available", "Metrics is not required to send")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardSuccessful, "Metrics is not required to send")
return nil
}

req := &http.Request{Method: "POST", URL: w.to}
if err := w.toClient.RemoteWrite(ctx, req, families, w.interval); err != nil {
if err := w.status.UpdateStatus(ctx, "Degraded", "Failed to send metrics"); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
updateStatus(statuslib.ForwardFailed, "Failed to send metrics")
return err
}

if w.simulatedTimeseriesFile == "" {
if err := w.status.UpdateStatus(ctx, "Available", "Cluster metrics sent successfully"); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
updateStatus(statuslib.ForwardSuccessful, "Cluster metrics sent successfully")
} else {
rlogger.Log(w.logger, rlogger.Warn, "msg", "Simulated metrics sent successfully")
}
Expand Down
147 changes: 31 additions & 116 deletions collectors/metrics/pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,35 @@ package status
import (
"context"
"errors"
"fmt"
"log/slog"
philipgough marked this conversation as resolved.
Show resolved Hide resolved
"os"
"slices"
"sort"
"strings"
"time"

"github.com/go-kit/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1"
"github.com/stolostron/multicluster-observability-operator/operators/pkg/status"
)

const (
name = "observability-addon"
namespace = "open-cluster-management-addon-observability"
uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092"
addonName = "observability-addon"
addonNamespace = "open-cluster-management-addon-observability"
)

type StatusReport struct {
statusClient client.Client
logger log.Logger
statusClient client.Client
standalone bool
isUwl bool
statusReporter status.Status
logger log.Logger
}

func New(logger log.Logger) (*StatusReport, error) {
func New(logger log.Logger, standalone, isUwl bool) (*StatusReport, error) {
testMode := os.Getenv("UNIT_TEST") != ""
standaloneMode := os.Getenv("STANDALONE") == "true"
var kubeClient client.Client
if testMode {
s := scheme.Scheme
Expand All @@ -50,8 +46,6 @@ func New(logger log.Logger) (*StatusReport, error) {
WithScheme(s).
WithStatusSubresource(&oav1beta1.ObservabilityAddon{}).
Build()
} else if standaloneMode {
kubeClient = nil
} else {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
Expand All @@ -67,114 +61,35 @@ func New(logger log.Logger) (*StatusReport, error) {
}
}

logger.Log("msg", "Creating status client", "standalone", standalone, "isUwl", isUwl)

statusLogger := logr.FromSlogHandler(slog.New(slog.NewTextHandler(os.Stdout, nil)).With("component", "statusclient").Handler())
return &StatusReport{
statusClient: kubeClient,
logger: log.With(logger, "component", "statusclient"),
statusClient: kubeClient,
standalone: standalone,
isUwl: isUwl,
statusReporter: status.NewStatus(kubeClient, addonName, addonNamespace, statusLogger),
logger: logger,
}, nil
}

func (s *StatusReport) UpdateStatus(ctx context.Context, t string, m string) error {
// statusClient is nil when running on the hub.
if s.statusClient == nil {
return nil
}

isUwl := false
if strings.Contains(os.Getenv("FROM"), uwlPromURL) {
isUwl = true
}

retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
addon := &oav1beta1.ObservabilityAddon{}
err := s.statusClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, addon)
if err != nil {
return fmt.Errorf("failed to get ObservabilityAddon %s/%s: %w", namespace, name, err)
}

// Sort the conditions by rising LastTransitionTime
sort.Slice(addon.Status.Conditions, func(i, j int) bool {
return addon.Status.Conditions[i].LastTransitionTime.Before(&addon.Status.Conditions[j].LastTransitionTime)
})

currentCondition := addon.Status.Conditions[len(addon.Status.Conditions)-1]
newCondition := mergeCondtion(isUwl, m, currentCondition)

// If the current condition is the same, do not update
if currentCondition.Type == newCondition.Type && currentCondition.Reason == newCondition.Reason && currentCondition.Message == newCondition.Message && currentCondition.Status == newCondition.Status {
return nil
}

s.logger.Log("msg", fmt.Sprintf("Updating status of ObservabilityAddon %s/%s", namespace, name), "type", newCondition.Type, "status", newCondition.Status, "reason", newCondition.Reason)

// Reset the status of other main conditions
for i := range addon.Status.Conditions {
if slices.Contains([]string{"Available", "Degraded", "Progressing"}, addon.Status.Conditions[i].Type) {
addon.Status.Conditions[i].Status = metav1.ConditionFalse
}
}

// Set the new condition
addon.Status.Conditions = mutateOrAppend(addon.Status.Conditions, newCondition)

if err := s.statusClient.Status().Update(ctx, addon); err != nil {
return fmt.Errorf("failed to update ObservabilityAddon %s/%s: %w", namespace, name, err)
}

func (s *StatusReport) UpdateStatus(ctx context.Context, reason status.Reason, message string) error {
// Standalone mode is set when running on the hub cluster
// In this case, we do not need to update the status of the ObservabilityAddon
if s.standalone {
return nil
})
if retryErr != nil {
return retryErr
}
return nil
}

func mergeCondtion(isUwl bool, m string, condition oav1beta1.StatusCondition) oav1beta1.StatusCondition {
messages := strings.Split(condition.Message, " ; ")
if len(messages) == 1 {
messages = append(messages, "")
}
if isUwl {
messages[1] = fmt.Sprintf("User Workload: %s", m)
} else {
messages[0] = m
component := status.MetricsCollector
if s.isUwl {
component = status.UwlMetricsCollector
}
message := messages[0]
if messages[1] != "" {
message = strings.Join(messages, " ; ")
}
conditionType := "Available"
reason := "Available"
if strings.Contains(message, "Failed") {
conditionType = "Degraded"
reason = "Degraded"
}
return oav1beta1.StatusCondition{
Type: conditionType,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
LastTransitionTime: metav1.NewTime(time.Now()),
}
}

// mutateOrAppend updates the status conditions with the new condition.
// If the condition already exists, it updates it with the new condition.
// If the condition does not exist, it appends the new condition to the status conditions.
func mutateOrAppend(conditions []oav1beta1.StatusCondition, newCondition oav1beta1.StatusCondition) []oav1beta1.StatusCondition {
if len(conditions) == 0 {
return []oav1beta1.StatusCondition{newCondition}
if wasReported, err := s.statusReporter.UpdateComponentCondition(ctx, component, reason, message); err != nil {
return err
} else if wasReported {
s.logger.Log("msg", "Status updated", "component", component, "reason", reason, "message", message)
}

for i, condition := range conditions {
if condition.Type == newCondition.Type {
// Update the existing condition
conditions[i] = newCondition
return conditions
}
}
// If the condition type does not exist, append the new condition
return append(conditions, newCondition)
return nil
}
Loading
Loading