Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vmsingle: add status field #673

Merged
merged 4 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions api/v1beta1/vmsingle_types.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
package v1beta1

import (
"bytes"
"encoding/json"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
SingleStatusExpanding SingleStatus = "expanding"
SingleStatusOperational SingleStatus = "operational"
SingleStatusFailed SingleStatus = "failed"
)

type SingleStatus string

// VMSingleSpec defines the desired state of VMSingle
// +k8s:openapi-gen=true
// +kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version",description="The version of VMSingle"
Expand Down Expand Up @@ -209,17 +221,17 @@ func (cr *VMSingleSpec) UnmarshalJSON(src []byte) error {
// VMSingleStatus defines the observed state of VMSingle
// +k8s:openapi-gen=true
type VMSingleStatus struct {
// ReplicaCount Total number of non-terminated pods targeted by this VMAlert
// cluster (their labels match the selector).
// ReplicaCount Total number of non-terminated pods targeted by this VMSingle.
Replicas int32 `json:"replicas"`
// UpdatedReplicas Total number of non-terminated pods targeted by this VMAlert
// cluster that have the desired version spec.
// UpdatedReplicas Total number of non-terminated pods targeted by this VMSingle.
UpdatedReplicas int32 `json:"updatedReplicas"`
// AvailableReplicas Total number of available pods (ready for at least minReadySeconds)
// targeted by this VMAlert cluster.
// AvailableReplicas Total number of available pods (ready for at least minReadySeconds) targeted by this VMSingle.
AvailableReplicas int32 `json:"availableReplicas"`
// UnavailableReplicas Total number of unavailable pods targeted by this VMAlert cluster.
// UnavailableReplicas Total number of unavailable pods targeted by this VMSingle.
UnavailableReplicas int32 `json:"unavailableReplicas"`

SingleStatus SingleStatus `json:"singleStatus"`
Reason string `json:"reason,omitempty"`
}

// VMSingle is fast, cost-effective and scalable time-series database.
Expand Down Expand Up @@ -369,6 +381,27 @@ func (cr *VMSingle) AsCRDOwner() []metav1.OwnerReference {
return GetCRDAsOwner(Single)
}

// LastAppliedSpecAsPatch return last applied single spec as patch annotation
func (cr *VMSingle) LastAppliedSpecAsPatch() (client.Patch, error) {
data, err := json.Marshal(cr.Spec)
if err != nil {
return nil, fmt.Errorf("possible bug, cannot serialize single specification as json :%w", err)
}
patch := fmt.Sprintf(`{"metadata":{"annotations":{"operator.victoriametrics/last-applied-spec": %q}}}`, data)
return client.RawPatch(types.MergePatchType, []byte(patch)), nil
}

// HasSpecChanges compares single spec with last applied single spec stored in annotation
func (cr *VMSingle) HasSpecChanges() (bool, error) {
var prevSingleSpec VMSingleSpec
lastAppliedSingleJSON := cr.Annotations["operator.victoriametrics/last-applied-spec"]
if err := json.Unmarshal([]byte(lastAppliedSingleJSON), &prevSingleSpec); err != nil {
return true, fmt.Errorf("cannot parse last applied single spec value: %s : %w", lastAppliedSingleJSON, err)
}
instanceSpecData, _ := json.Marshal(cr.Spec)
return !bytes.Equal([]byte(lastAppliedSingleJSON), instanceSpecData), nil
}

func init() {
SchemeBuilder.Register(&VMSingle{}, &VMSingleList{})
}
47 changes: 40 additions & 7 deletions api/victoriametrics/v1beta1/vmsingle_types.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
package v1beta1

import (
"bytes"
"encoding/json"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
SingleStatusExpanding SingleStatus = "expanding"
SingleStatusOperational SingleStatus = "operational"
SingleStatusFailed SingleStatus = "failed"
)

type SingleStatus string

// VMSingleSpec defines the desired state of VMSingle
// +k8s:openapi-gen=true
// +kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version",description="The version of VMSingle"
Expand Down Expand Up @@ -209,17 +221,17 @@ func (cr *VMSingleSpec) UnmarshalJSON(src []byte) error {
// VMSingleStatus defines the observed state of VMSingle
// +k8s:openapi-gen=true
type VMSingleStatus struct {
// ReplicaCount Total number of non-terminated pods targeted by this VMAlert
// cluster (their labels match the selector).
// ReplicaCount Total number of non-terminated pods targeted by this VMSingle.
Replicas int32 `json:"replicas"`
// UpdatedReplicas Total number of non-terminated pods targeted by this VMAlert
// cluster that have the desired version spec.
// UpdatedReplicas Total number of non-terminated pods targeted by this VMSingle.
UpdatedReplicas int32 `json:"updatedReplicas"`
// AvailableReplicas Total number of available pods (ready for at least minReadySeconds)
// targeted by this VMAlert cluster.
// AvailableReplicas Total number of available pods (ready for at least minReadySeconds) targeted by this VMSingle.
AvailableReplicas int32 `json:"availableReplicas"`
// UnavailableReplicas Total number of unavailable pods targeted by this VMAlert cluster.
// UnavailableReplicas Total number of unavailable pods targeted by this VMSingle.
UnavailableReplicas int32 `json:"unavailableReplicas"`

SingleStatus SingleStatus `json:"singleStatus"`
Reason string `json:"reason,omitempty"`
}

// VMSingle is fast, cost-effective and scalable time-series database.
Expand Down Expand Up @@ -369,6 +381,27 @@ func (cr *VMSingle) AsCRDOwner() []metav1.OwnerReference {
return GetCRDAsOwner(Single)
}

// LastAppliedSpecAsPatch return last applied single spec as patch annotation
func (cr *VMSingle) LastAppliedSpecAsPatch() (client.Patch, error) {
data, err := json.Marshal(cr.Spec)
if err != nil {
return nil, fmt.Errorf("possible bug, cannot serialize single specification as json :%w", err)
}
patch := fmt.Sprintf(`{"metadata":{"annotations":{"operator.victoriametrics/last-applied-spec": %q}}}`, data)
return client.RawPatch(types.MergePatchType, []byte(patch)), nil
}

// HasSpecChanges compares single spec with last applied single spec stored in annotation
func (cr *VMSingle) HasSpecChanges() (bool, error) {
var prevSingleSpec VMSingleSpec
lastAppliedSingleJSON := cr.Annotations["operator.victoriametrics/last-applied-spec"]
if err := json.Unmarshal([]byte(lastAppliedSingleJSON), &prevSingleSpec); err != nil {
return true, fmt.Errorf("cannot parse last applied single spec value: %s : %w", lastAppliedSingleJSON, err)
}
instanceSpecData, _ := json.Marshal(cr.Spec)
return !bytes.Equal([]byte(lastAppliedSingleJSON), instanceSpecData), nil
}

func init() {
SchemeBuilder.Register(&VMSingle{}, &VMSingleList{})
}
13 changes: 9 additions & 4 deletions config/crd/bases/operator.victoriametrics.com_vmsingles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1257,27 +1257,32 @@ spec:
properties:
availableReplicas:
description: AvailableReplicas Total number of available pods (ready
for at least minReadySeconds) targeted by this VMAlert cluster.
for at least minReadySeconds) targeted by this VMSingle.
format: int32
type: integer
reason:
type: string
replicas:
description: ReplicaCount Total number of non-terminated pods targeted
by this VMAlert cluster (their labels match the selector).
by this VMSingle.
format: int32
type: integer
singleStatus:
type: string
unavailableReplicas:
description: UnavailableReplicas Total number of unavailable pods
targeted by this VMAlert cluster.
targeted by this VMSingle.
format: int32
type: integer
updatedReplicas:
description: UpdatedReplicas Total number of non-terminated pods targeted
by this VMAlert cluster that have the desired version spec.
by this VMSingle.
format: int32
type: integer
required:
- availableReplicas
- replicas
- singleStatus
- unavailableReplicas
- updatedReplicas
type: object
Expand Down
11 changes: 5 additions & 6 deletions controllers/factory/vmsingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const (
)

func CreateVMSingleStorage(ctx context.Context, cr *victoriametricsv1beta1.VMSingle, rclient client.Client) (*corev1.PersistentVolumeClaim, error) {

l := log.WithValues("vm.single.pvc.create", cr.Name)
l.Info("reconciling pvc")
newPvc := makeVMSinglePvc(cr)
Expand Down Expand Up @@ -86,7 +85,6 @@ func makeVMSinglePvc(cr *victoriametricsv1beta1.VMSingle) *corev1.PersistentVolu
}

func CreateOrUpdateVMSingle(ctx context.Context, cr *victoriametricsv1beta1.VMSingle, rclient client.Client, c *config.BaseOperatorConf) (*appsv1.Deployment, error) {

if err := psp.CreateServiceAccountForCRD(ctx, cr, rclient); err != nil {
return nil, fmt.Errorf("failed create service account: %w", err)
}
Expand All @@ -103,6 +101,9 @@ func CreateOrUpdateVMSingle(ctx context.Context, cr *victoriametricsv1beta1.VMSi
if err := k8stools.HandleDeployUpdate(ctx, rclient, newDeploy); err != nil {
return nil, err
}
if err = waitExpanding(ctx, rclient, cr.Namespace, cr.SelectorLabels(), *cr.Spec.ReplicaCount, c.PodWaitReadyTimeout); err != nil {
return nil, fmt.Errorf("cannot wait until ready status for single deploy: %w", err)
}

return newDeploy, nil
}
Expand Down Expand Up @@ -358,11 +359,9 @@ func makeSpecForVMSingle(cr *victoriametricsv1beta1.VMSingle, c *config.BaseOper
}

return vmSingleSpec, nil

}

func CreateOrUpdateVMSingleService(ctx context.Context, cr *victoriametricsv1beta1.VMSingle, rclient client.Client, c *config.BaseOperatorConf) (*corev1.Service, error) {

cr = cr.DeepCopy()
if cr.Spec.Port == "" {
cr.Spec.Port = c.VMSingleDefault.Port
Expand Down Expand Up @@ -431,11 +430,11 @@ func makeSpecForVMBackuper(
snapshotCreateURL := cr.SnapshotCreateURL
snapshotDeleteURL := cr.SnapShotDeleteURL
if snapshotCreateURL == "" {
//http://localhost:port/snaphsot/create
// http://localhost:port/snaphsot/create
snapshotCreateURL = cr.SnapshotCreatePathWithFlags(port, extraArgs)
}
if snapshotDeleteURL == "" {
//http://localhost:port/snaphsot/delete
// http://localhost:port/snaphsot/delete
snapshotDeleteURL = cr.SnapshotDeletePathWithFlags(port, extraArgs)
}
backupDst := cr.Destination
Expand Down
16 changes: 15 additions & 1 deletion controllers/factory/vmsingle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
)

func TestCreateOrUpdateVMSingle(t *testing.T) {
Expand All @@ -35,7 +36,13 @@ func TestCreateOrUpdateVMSingle(t *testing.T) {
Name: "vmsingle-base",
Namespace: "default",
},
Spec: victoriametricsv1beta1.VMSingleSpec{},
Spec: victoriametricsv1beta1.VMSingleSpec{ReplicaCount: pointer.Int32Ptr(1)},
},
},
predefinedObjects: []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "vmsingle-0", Labels: map[string]string{"app.kubernetes.io/component": "monitoring", "app.kubernetes.io/name": "vmsingle", "app.kubernetes.io/instance": "vmsingle-base", "managed-by": "vm-operator"}},
Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: "True"}}},
},
},
want: &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "vmsingle-vmsingle-base", Namespace: "default"}},
Expand All @@ -56,9 +63,16 @@ func TestCreateOrUpdateVMSingle(t *testing.T) {
GraphitePort: "8053",
OpenTSDBPort: "8054",
},
ReplicaCount: pointer.Int32Ptr(1),
},
},
},
predefinedObjects: []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "vmsingle-0", Labels: map[string]string{"app.kubernetes.io/component": "monitoring", "app.kubernetes.io/name": "vmsingle", "app.kubernetes.io/instance": "vmsingle-base", "managed-by": "vm-operator"}},
Status: corev1.PodStatus{Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: "True"}}},
},
},
want: &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "vmsingle-vmsingle-base", Namespace: "default"}},
},
}
Expand Down
42 changes: 38 additions & 4 deletions controllers/vmsingle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ import (
"context"
"fmt"

"github.com/VictoriaMetrics/operator/controllers/factory/finalize"
"sigs.k8s.io/controller-runtime/pkg/builder"

"github.com/VictoriaMetrics/operator/controllers/factory"
"github.com/VictoriaMetrics/operator/internal/config"
"github.com/go-logr/logr"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"

victoriametricsv1beta1 "github.com/VictoriaMetrics/operator/api/v1beta1"
"github.com/VictoriaMetrics/operator/controllers/factory/finalize"
)

// VMSingleReconciler reconciles a VMSingle object
Expand Down Expand Up @@ -73,6 +73,17 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
if instance.Spec.ParsingError != "" {
return handleParsingError(instance.Spec.ParsingError, instance)
}
specChanged, err := instance.HasSpecChanges()
if err != nil {
reqLogger.Error(err, "failed to check if single spec changed")
}
if specChanged && instance.Status.SingleStatus != victoriametricsv1beta1.SingleStatusFailed {
instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusExpanding
if err := r.Client.Status().Update(ctx, instance); err != nil {
return result, fmt.Errorf("cannot set expanding status for single: %w", err)
}
}

if err := finalize.AddFinalizer(ctx, r.Client, instance); err != nil {
return result, err
}
Expand All @@ -90,7 +101,12 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r

_, err = factory.CreateOrUpdateVMSingle(ctx, instance, r, r.BaseConf)
Haleygo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return result, err
instance.Status.Reason = err.Error()
instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusFailed
if err := r.Client.Status().Update(ctx, instance); err != nil {
log.Error(err, "cannot set failed status for single")
}
return result, fmt.Errorf("failed create or update single: %w", err)
}

svc, err := factory.CreateOrUpdateVMSingleService(ctx, instance, r, r.BaseConf)
Expand All @@ -104,6 +120,24 @@ func (r *VMSingleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
reqLogger.Error(err, "cannot create serviceScrape for vmsingle")
}
}

instance.Status.Reason = ""
instance.Status.SingleStatus = victoriametricsv1beta1.SingleStatusOperational
if err := r.Client.Status().Update(ctx, instance); err != nil {
return result, fmt.Errorf("cannot update single status: %w", err)
}

if specChanged {
specPatch, err := instance.LastAppliedSpecAsPatch()
if err != nil {
return ctrl.Result{}, fmt.Errorf("cannot parse last applied spec for single: %w", err)
}
// use patch instead of update, only 1 field must be changed.
if err := r.Client.Patch(ctx, instance, specPatch); err != nil {
return result, fmt.Errorf("cannot update single with last applied spec: %w", err)
}
}

return
}

Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ type BaseOperatorConf struct {
PodWaitReadyTimeout time.Duration `default:"80s"`
PodWaitReadyIntervalCheck time.Duration `default:"5s"`
PodWaitReadyInitDelay time.Duration `default:"10s"`
// configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth
// configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth.
ForceResyncInterval time.Duration `default:"60s"`
}

Expand Down