Skip to content

Commit

Permalink
Initial implementation for the restore queue
Browse files Browse the repository at this point in the history
Adds the estimated queue to the NAB restore object

Signed-off-by: Michal Pryc <mpryc@redhat.com>
  • Loading branch information
mpryc committed Dec 10, 2024
1 parent 56afada commit 80b1e2d
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 6 deletions.
7 changes: 7 additions & 0 deletions api/v1alpha1/nonadminrestore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type NonAdminRestoreStatus struct {
// +optional
VeleroRestore *VeleroRestore `json:"veleroRestore,omitempty"`

// queueInfo is used to estimate how many restores are scheduled before the given VeleroRestore in the OADP namespace.
// This number is not guaranteed to be accurate, but it should be close. It's inaccurate for cases when
// Velero pod is not running or being restarted after Restore object were created.
// It counts only VeleroRestores that are still subject to be handled by OADP/Velero.
// +optional
QueueInfo *QueueInfo `json:"queueInfo,omitempty"`

// phase is a simple one high-level summary of the lifecycle of an NonAdminRestore.
Phase NonAdminPhase `json:"phase,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/crd/bases/oadp.openshift.io_nonadminrestores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,20 @@ spec:
- Created
- Deleting
type: string
queueInfo:
description: |-
queueInfo is used to estimate how many restores are scheduled before the given VeleroRestore in the OADP namespace.
This number is not guaranteed to be accurate, but it should be close. It's inaccurate for cases when
Velero pod is not running or being restarted after Restore object were created.
It counts only VeleroRestores that are still subject to be handled by OADP/Velero.
properties:
estimatedQueuePosition:
description: estimatedQueuePosition is the number of operations
ahead in the queue (0 if not queued)
type: integer
required:
- estimatedQueuePosition
type: object
veleroRestore:
description: VeleroRestore contains information of the related Velero
restore object.
Expand Down
73 changes: 73 additions & 0 deletions internal/common/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,79 @@ func GetBackupQueueInfo(ctx context.Context, clientInstance client.Client, names
return queueInfo, nil
}

// GetActiveVeleroRestoresByLabel retrieves all VeleroRestore objects based on a specified label within a given namespace.
// It returns a slice of VeleroRestore objects or nil if none are found.
func GetActiveVeleroRestoresByLabel(ctx context.Context, clientInstance client.Client, namespace, labelKey, labelValue string) ([]velerov1.Restore, error) {
var veleroRestoreList velerov1.RestoreList
labelSelector := client.MatchingLabels{labelKey: labelValue}

if err := clientInstance.List(ctx, &veleroRestoreList, client.InNamespace(namespace), labelSelector); err != nil {
return nil, err
}

// Filter out restores with a CompletionTimestamp
var activeRestores []velerov1.Restore
for _, restore := range veleroRestoreList.Items {
if restore.Status.CompletionTimestamp == nil {
activeRestores = append(activeRestores, restore)
}
}

if len(activeRestores) == 0 {
return nil, nil
}

return activeRestores, nil
}

// GetRestoreQueueInfo determines the queue position of the specified VeleroRestore.
// It calculates how many queued Restores exist in the namespace that were created before this one.
func GetRestoreQueueInfo(ctx context.Context, clientInstance client.Client, namespace string, targetRestore *velerov1.Restore) (nacv1alpha1.QueueInfo, error) {
var queueInfo nacv1alpha1.QueueInfo

// If the target restore has no valid CreationTimestamp, it means that it's not yet reconciled by OADP/Velero.
// In this case, we can't determine its queue position, so we return nil.
if targetRestore == nil || targetRestore.CreationTimestamp.IsZero() {
return queueInfo, nil
}

// If the target restore has a CompletionTimestamp, it means that it's already served.
if targetRestore.Status.CompletionTimestamp != nil {
queueInfo.EstimatedQueuePosition = 0
return queueInfo, nil
}

// List all Restore objects in the namespace
var restoreList velerov1.RestoreList
if err := clientInstance.List(ctx, &restoreList, client.InNamespace(namespace)); err != nil {
return queueInfo, err
}

// Extract the target restore's creation timestamp
targetTimestamp := targetRestore.CreationTimestamp.Time

// The target restore is always in queue at least in the first position
// 0 is reserved for the restores that are already served.
queueInfo.EstimatedQueuePosition = 1

// Iterate through restores and calculate position
for i := range restoreList.Items {
restore := &restoreList.Items[i]

// Skip restores that have CompletionTimestamp set. This means that the Velero won't be further processing this restore.
if restore.Status.CompletionTimestamp != nil {
continue
}

// Count restores created earlier than the target restore
if restore.CreationTimestamp.Time.Before(targetTimestamp) {
queueInfo.EstimatedQueuePosition++
}
}

return queueInfo, nil
}

// GetVeleroDeleteBackupRequestByLabel retrieves a DeleteBackupRequest object based on a specified label within a given namespace.
// It returns the DeleteBackupRequest only when exactly one object is found, throws an error if multiple backups are found,
// or returns nil if no matches are found.
Expand Down
15 changes: 13 additions & 2 deletions internal/controller/nonadminrestore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,18 @@ func (r *NonAdminRestoreReconciler) createVeleroRestore(ctx context.Context, log
logger.Info("VeleroRestore successfully created")
}

// TODO(migi): do we need estimatedQueuePosition in VeleroRestoreStatus?
updatedQueueInfo := false

// Determine how many Backups are scheduled before the given VeleroRestore in the OADP namespace.
queueInfo, err := function.GetRestoreQueueInfo(ctx, r.Client, r.OADPNamespace, veleroRestore)
if err != nil {
// Log error and continue with the reconciliation, this is not critical error as it's just
// about the Velero Restore queue position information
logger.Error(err, "Failed to get the queue position for the VeleroRestore")
} else {
nar.Status.QueueInfo = &queueInfo
updatedQueueInfo = true
}

updatedPhase := updateNonAdminPhase(&nar.Status.Phase, nacv1alpha1.NonAdminPhaseCreated)

Expand All @@ -391,7 +402,7 @@ func (r *NonAdminRestoreReconciler) createVeleroRestore(ctx context.Context, log

updatedVeleroStatus := updateVeleroRestoreStatus(&nar.Status, veleroRestore)

if updatedPhase || updatedCondition || updatedVeleroStatus {
if updatedPhase || updatedCondition || updatedVeleroStatus || updatedQueueInfo {
if err := r.Status().Update(ctx, nar); err != nil {
logger.Error(err, nonAdminRestoreStatusUpdateFailureMessage)
return false, err
Expand Down
97 changes: 97 additions & 0 deletions internal/handler/velerorestore_queue_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package handler contains all event handlers of the project
package handler

import (
"context"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/migtools/oadp-non-admin/internal/common/constant"
"github.com/migtools/oadp-non-admin/internal/common/function"
)

// VeleroRestoreQueueHandler contains event handlers for Velero Restore objects
type VeleroRestoreQueueHandler struct {
Client client.Client
OADPNamespace string
}

// Create event handler
func (VeleroRestoreQueueHandler) Create(_ context.Context, _ event.CreateEvent, _ workqueue.RateLimitingInterface) {
// Create event handler for the Restore object
}

// Update event handler adds Velero Restore's NonAdminRestore to controller queue
func (h VeleroRestoreQueueHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
// Only update to the first in the queue Velero Restore should trigger changes to the
// NonAdminRestore objects. Updates to the Velero Restore 2nd and 3rd does not lower the
// queue. This optimizes the number of times we need to update the NonAdminRestore objects
// and the number of Velero Restore objects we need to react on.

logger := function.GetLogger(ctx, evt.ObjectNew, "VeleroBackupQueueHandler")

// Fetching Velero Restores triggered by NonAdminRestore to optimize our reconcile cycles
restores, err := function.GetActiveVeleroRestoresByLabel(ctx, h.Client, h.OADPNamespace, constant.ManagedByLabel, constant.ManagedByLabelValue)
if err != nil {
logger.Error(err, "Failed to get Velero Restores by label")
return
}

if restores == nil {
// That should't really be the case as our Update event was triggered by a Velero Restore
// object that has a new CompletionTimestamp.
logger.V(1).Info("No pending velero restores found in namespace.", constant.NamespaceString, h.OADPNamespace)
} else {
nabEventAnnotations := evt.ObjectNew.GetAnnotations()
nabEventOriginNamespace := nabEventAnnotations[constant.NabOriginNamespaceAnnotation]
nabEventOriginName := nabEventAnnotations[constant.NabOriginNameAnnotation]

for _, restore := range restores {
annotations := restore.GetAnnotations()
nabOriginNamespace := annotations[constant.NabOriginNamespaceAnnotation]
nabOriginName := annotations[constant.NabOriginNameAnnotation]

// This object is within current queue, so there is no need to trigger changes to it.
// The VeleroBackupHandler will serve for that.
if nabOriginNamespace != nabEventOriginNamespace || nabOriginName != nabEventOriginName {
logger.V(1).Info("Processing Queue update for the NonAdmin Restore referenced by Velero Restore", "Name", restore.Name, constant.NamespaceString, restore.Namespace, "CreatedAt", restore.CreationTimestamp)
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: nabOriginName,
Namespace: nabOriginNamespace,
}})
} else {
logger.V(1).Info("Ignoring Queue update for the NonAdmin Restore that triggered this event", "Name", restore.Name, constant.NamespaceString, restore.Namespace, "CreatedAt", restore.CreationTimestamp)
}
}
}
}

// Delete event handler
func (VeleroRestoreQueueHandler) Delete(_ context.Context, _ event.DeleteEvent, _ workqueue.RateLimitingInterface) {
// Delete event handler for the Restore object
}

// Generic event handler
func (VeleroRestoreQueueHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) {
// Generic event handler for the Restore object
}
9 changes: 5 additions & 4 deletions internal/predicate/compositerestore_predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (

// CompositeRestorePredicate is a combination of NonAdminRestore and Velero Restore event filters
type CompositeRestorePredicate struct {
Context context.Context
NonAdminRestorePredicate NonAdminRestorePredicate
VeleroRestorePredicate VeleroRestorePredicate
Context context.Context
NonAdminRestorePredicate NonAdminRestorePredicate
VeleroRestorePredicate VeleroRestorePredicate
VeleroRestoreQueuePredicate VeleroRestoreQueuePredicate
}

// Create event filter only accepts NonAdminRestore create events
Expand All @@ -48,7 +49,7 @@ func (p CompositeRestorePredicate) Update(evt event.UpdateEvent) bool {
case *nacv1alpha1.NonAdminRestore:
return p.NonAdminRestorePredicate.Update(p.Context, evt)
case *velerov1.Restore:
return p.VeleroRestorePredicate.Update(p.Context, evt)
return p.VeleroRestoreQueuePredicate.Update(p.Context, evt) || p.VeleroRestorePredicate.Update(p.Context, evt)
default:
return false
}
Expand Down
60 changes: 60 additions & 0 deletions internal/predicate/velerorestore_queue_predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package predicate

import (
"context"

velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/migtools/oadp-non-admin/internal/common/function"
)

// VeleroRestoreQueuePredicate contains event filters for Velero Restore objects
type VeleroRestoreQueuePredicate struct {
OADPNamespace string
}

// Update event filter only accepts Velero Restore update events from the OADP namespace
// and from Velero Restores that have a new CompletionTimestamp. We are not interested in
// checking if the Velero Restore contains NonAdminRestore metadata, because every Velero Restore
// may change the Queue position of the NonAdminRestore object.
func (p VeleroRestoreQueuePredicate) Update(ctx context.Context, evt event.UpdateEvent) bool {
logger := function.GetLogger(ctx, evt.ObjectNew, "VeleroRestoreQueuePredicate")

// Ensure the new and old objects are of the expected type
newRestore, okNew := evt.ObjectNew.(*velerov1.Restore)
oldRestore, okOld := evt.ObjectOld.(*velerov1.Restore)

if !okNew || !okOld {
logger.V(1).Info("Rejected Restore Update event: invalid object type")
return false
}

namespace := newRestore.GetNamespace()

if namespace == p.OADPNamespace {
if oldRestore.Status.CompletionTimestamp == nil && newRestore.Status.CompletionTimestamp != nil {
logger.V(1).Info("Accepted Restore Update event: new completion timestamp")
return true
}
}

logger.V(1).Info("Rejected Restore Update event: no changes to the CompletionTimestamp in the VeleroRestore object")
return false
}

0 comments on commit 80b1e2d

Please sign in to comment.