From cb6830b170a68804956c58d35a9b9cc85278fa5f Mon Sep 17 00:00:00 2001 From: Haleygo Date: Tue, 20 Jun 2023 23:48:50 +0800 Subject: [PATCH] vmsingle: add status field --- api/client/versioned/fake/register.go | 14 ++--- api/client/versioned/scheme/register.go | 14 ++--- api/v1beta1/vmsingle_types.go | 47 ++++++++++++---- api/victoriametrics/v1beta1/vmsingle_types.go | 47 ++++++++++++---- controllers/vmsingle_controller.go | 56 ++++++++++++++++++- internal/config/config.go | 2 +- 6 files changed, 140 insertions(+), 40 deletions(-) diff --git a/api/client/versioned/fake/register.go b/api/client/versioned/fake/register.go index 9450c7044..5d807e168 100644 --- a/api/client/versioned/fake/register.go +++ b/api/client/versioned/fake/register.go @@ -36,14 +36,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. diff --git a/api/client/versioned/scheme/register.go b/api/client/versioned/scheme/register.go index 9e98484e6..def0e44c9 100644 --- a/api/client/versioned/scheme/register.go +++ b/api/client/versioned/scheme/register.go @@ -36,14 +36,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. diff --git a/api/v1beta1/vmsingle_types.go b/api/v1beta1/vmsingle_types.go index 4b8e26dc7..a6b8d6621 100644 --- a/api/v1beta1/vmsingle_types.go +++ b/api/v1beta1/vmsingle_types.go @@ -8,9 +8,20 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + SingleStatusExpanding SingleStatus = "expanding" + SingleStatusOperational SingleStatus = "operational" + SingleStatusFailed SingleStatus = "failed" ) +type SingleStatus string + // VMSingleSpec defines the desired state of VMSingle // +k8s:openapi-gen=true // +kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version",description="The version of VMSingle" @@ -209,17 +220,8 @@ func (cr *VMSingleSpec) UnmarshalJSON(src []byte) error { // VMSingleStatus defines the observed state of VMSingle // +k8s:openapi-gen=true type VMSingleStatus struct { - // ReplicaCount Total number of non-terminated pods targeted by this VMAlert - // cluster (their labels match the selector). - Replicas int32 `json:"replicas"` - // UpdatedReplicas Total number of non-terminated pods targeted by this VMAlert - // cluster that have the desired version spec. - UpdatedReplicas int32 `json:"updatedReplicas"` - // AvailableReplicas Total number of available pods (ready for at least minReadySeconds) - // targeted by this VMAlert cluster. - AvailableReplicas int32 `json:"availableReplicas"` - // UnavailableReplicas Total number of unavailable pods targeted by this VMAlert cluster. - UnavailableReplicas int32 `json:"unavailableReplicas"` + SingleStatus SingleStatus `json:"singleStatus"` + Reason string `json:"reason,omitempty"` } // VMSingle is fast, cost-effective and scalable time-series database. @@ -369,6 +371,29 @@ func (cr *VMSingle) AsCRDOwner() []metav1.OwnerReference { return GetCRDAsOwner(Single) } +// LastAppliedSpecAsPatch return last applied single spec as patch annotation +func (cr *VMSingle) LastAppliedSpecAsPatch() (client.Patch, error) { + data, err := json.Marshal(cr.Spec) + if err != nil { + return nil, fmt.Errorf("possible bug, cannot serialize single specification as json :%w", err) + } + patch := fmt.Sprintf(`{"metadata":{"annotations":{"operator.victoriametrics/last-applied-spec": %q}}}`, data) + return client.RawPatch(types.MergePatchType, []byte(patch)), nil +} + +// GetLastAppliedSpec returns last applied single spec +func (cr *VMSingle) GetLastAppliedSpec() (*VMSingleSpec, error) { + var prevSingleSpec VMSingleSpec + prevSingleJSON := cr.Annotations["operator.victoriametrics/last-applied-spec"] + if prevSingleJSON == "" { + return &prevSingleSpec, nil + } + if err := json.Unmarshal([]byte(prevSingleJSON), &prevSingleSpec); err != nil { + return nil, fmt.Errorf("cannot parse last applied single spec value: %s : %w", prevSingleJSON, err) + } + return &prevSingleSpec, nil +} + func init() { SchemeBuilder.Register(&VMSingle{}, &VMSingleList{}) } diff --git a/api/victoriametrics/v1beta1/vmsingle_types.go b/api/victoriametrics/v1beta1/vmsingle_types.go index 4b8e26dc7..74af30b11 100644 --- a/api/victoriametrics/v1beta1/vmsingle_types.go +++ b/api/victoriametrics/v1beta1/vmsingle_types.go @@ -8,9 +8,20 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + SingleStatusExpanding SingleStatus = "expanding" + SingleStatusOperational SingleStatus = "operational" + SingleStatusFailed SingleStatus = "failed" ) +type SingleStatus string + // VMSingleSpec defines the desired state of VMSingle // +k8s:openapi-gen=true // +kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version",description="The version of VMSingle" @@ -209,17 +220,8 @@ func (cr *VMSingleSpec) UnmarshalJSON(src []byte) error { // VMSingleStatus defines the observed state of VMSingle // +k8s:openapi-gen=true type VMSingleStatus struct { - // ReplicaCount Total number of non-terminated pods targeted by this VMAlert - // cluster (their labels match the selector). - Replicas int32 `json:"replicas"` - // UpdatedReplicas Total number of non-terminated pods targeted by this VMAlert - // cluster that have the desired version spec. - UpdatedReplicas int32 `json:"updatedReplicas"` - // AvailableReplicas Total number of available pods (ready for at least minReadySeconds) - // targeted by this VMAlert cluster. - AvailableReplicas int32 `json:"availableReplicas"` - // UnavailableReplicas Total number of unavailable pods targeted by this VMAlert cluster. - UnavailableReplicas int32 `json:"unavailableReplicas"` + SingleStatus SingleStatus `json:"singleStatus"` + Reason string `json:"reason,omitempty"` } // VMSingle is fast, cost-effective and scalable time-series database. @@ -369,6 +371,29 @@ func (cr *VMSingle) AsCRDOwner() []metav1.OwnerReference { return GetCRDAsOwner(Single) } +// LastAppliedSpecAsPatch return last applied cluster spec as patch annotation +func (cr *VMSingle) LastAppliedSpecAsPatch() (client.Patch, error) { + data, err := json.Marshal(cr.Spec) + if err != nil { + return nil, fmt.Errorf("possible bug, cannot serialize single specification as json :%w", err) + } + patch := fmt.Sprintf(`{"metadata":{"annotations":{"operator.victoriametrics/last-applied-spec": %q}}}`, data) + return client.RawPatch(types.MergePatchType, []byte(patch)), nil +} + +// GetLastAppliedSpec returns last applied single spec +func (cr *VMSingle) GetLastAppliedSpec() (*VMSingleSpec, error) { + var prevSingleSpec VMSingleSpec + prevSingleJSON := cr.Annotations["operator.victoriametrics/last-applied-spec"] + if prevSingleJSON == "" { + return &prevSingleSpec, nil + } + if err := json.Unmarshal([]byte(prevSingleJSON), &prevSingleSpec); err != nil { + return nil, fmt.Errorf("cannot parse last applied single spec value: %s : %w", prevSingleJSON, err) + } + return &prevSingleSpec, nil +} + func init() { SchemeBuilder.Register(&VMSingle{}, &VMSingleList{}) } diff --git a/controllers/vmsingle_controller.go b/controllers/vmsingle_controller.go index 271c2ba88..ec0bf9b94 100644 --- a/controllers/vmsingle_controller.go +++ b/controllers/vmsingle_controller.go @@ -20,19 +20,21 @@ import ( "context" "fmt" - "github.com/VictoriaMetrics/operator/controllers/factory/finalize" - "sigs.k8s.io/controller-runtime/pkg/builder" - "github.com/VictoriaMetrics/operator/controllers/factory" "github.com/VictoriaMetrics/operator/internal/config" "github.com/go-logr/logr" + "github.com/go-test/deep" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" victoriametricsv1beta1 "github.com/VictoriaMetrics/operator/api/v1beta1" + "github.com/VictoriaMetrics/operator/controllers/factory/finalize" ) // VMSingleReconciler reconciles a VMSingle object @@ -73,6 +75,35 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r if instance.Spec.ParsingError != "" { return handleParsingError(instance.Spec.ParsingError, instance) } + lastAppliedSingleSpec, err := instance.GetLastAppliedSpec() + if err != nil { + reqLogger.Error(err, "cannot parse last applied single spec") + } + singleChanges := deep.Equal(lastAppliedSingleSpec, &instance.Spec) + if len(singleChanges) == 0 { + // only update status by deployment pod status if single has no change + var currentDeploy appsv1.Deployment + err := r.Client.Get(ctx, types.NamespacedName{Name: instance.PrefixedName(), Namespace: instance.Namespace}, ¤tDeploy) + if err != nil { + return result, fmt.Errorf("failed to get deployment for vmsingle %s: %w", req.NamespacedName, err) + } + if currentDeploy.Status.AvailableReplicas == currentDeploy.Status.Replicas { + instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusOperational + } else { + instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusFailed + } + if err := r.Client.Status().Update(ctx, instance); err != nil { + return result, fmt.Errorf("cannot update status for vmsingle %s: %w", req.NamespacedName, err) + } + return result, nil + } + if instance.Status.SingleStatus != victoriametricsv1beta1.SingleStatusExpanding { + instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusExpanding + if err := r.Client.Status().Update(ctx, instance); err != nil { + return result, fmt.Errorf("cannot set expanding status for vmsingle %s: %w", req.NamespacedName, err) + } + } + if err := finalize.AddFinalizer(ctx, r.Client, instance); err != nil { return result, err } @@ -90,6 +121,11 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r _, err = factory.CreateOrUpdateVMSingle(ctx, instance, r, r.BaseConf) if err != nil { + instance.Status.Reason = err.Error() + instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusFailed + if err := r.Client.Status().Update(ctx, instance); err != nil { + log.Error(err, "cannot update vmsingle status field", "name", instance.Name, "namespace", instance.Namespace) + } return result, err } @@ -104,6 +140,20 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r reqLogger.Error(err, "cannot create serviceScrape for vmsingle") } } + + specPatch, err := instance.LastAppliedSpecAsPatch() + if err != nil { + return ctrl.Result{}, fmt.Errorf("cannot parse last applied spec for vmsingle %s: %w", req.NamespacedName, err) + } + // use patch instead of update, only 1 field must be changed. + if err := r.Client.Patch(ctx, instance, specPatch); err != nil { + return result, fmt.Errorf("cannot update vmsingle %s with last applied spec: %w", req.NamespacedName, err) + } + + if r.BaseConf.ForceResyncInterval > 0 { + result.RequeueAfter = r.BaseConf.ForceResyncInterval + } + return } diff --git a/internal/config/config.go b/internal/config/config.go index 900018e18..f5b40c3cf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -246,7 +246,7 @@ type BaseOperatorConf struct { PodWaitReadyTimeout time.Duration `default:"80s"` PodWaitReadyIntervalCheck time.Duration `default:"5s"` PodWaitReadyInitDelay time.Duration `default:"10s"` - // configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth + // configures force resync interval for VMAgent, VMAlert, VMAlertmanager, VMAuth and VMsingle. ForceResyncInterval time.Duration `default:"60s"` }