From d8fc5f6313020fb8b5b076848e7a9ab7039961cb Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Thu, 2 Dec 2021 20:59:21 +0800 Subject: [PATCH 1/2] Migrate backup sync controller from code-generator to kubebuilder 1. use kubebuilder's reconcile logic to replace controller's old logic. 2. use ginkgo and gomega to replace testing. 3. modify BSL reconciler registration method. Signed-off-by: Xun Jiang --- changelogs/unreleased/5218-jxun | 1 + pkg/cmd/server/server.go | 77 +- .../backup_storage_location_controller.go | 60 +- ...backup_storage_location_controller_test.go | 32 +- pkg/controller/backup_sync_controller.go | 273 +++--- pkg/controller/backup_sync_controller_test.go | 854 ++++++++---------- 6 files changed, 636 insertions(+), 661 deletions(-) create mode 100644 changelogs/unreleased/5218-jxun diff --git a/changelogs/unreleased/5218-jxun b/changelogs/unreleased/5218-jxun new file mode 100644 index 0000000000..d2274476e0 --- /dev/null +++ b/changelogs/unreleased/5218-jxun @@ -0,0 +1 @@ +Migrate backup sync controller from code-generator to kubebuilder. \ No newline at end of file diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index f45a302207..431943bba6 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -51,6 +51,7 @@ import ( snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1" "github.com/vmware-tanzu/velero/internal/credentials" + "github.com/vmware-tanzu/velero/internal/storage" "github.com/vmware-tanzu/velero/pkg/backup" "github.com/vmware-tanzu/velero/pkg/buildinfo" "github.com/vmware-tanzu/velero/pkg/client" @@ -77,7 +78,6 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" - "github.com/vmware-tanzu/velero/internal/storage" "github.com/vmware-tanzu/velero/internal/util/managercontroller" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" @@ -603,29 +603,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string csiVSLister, csiVSCLister, csiVSClassLister := s.getCSISnapshotListers() - backupSyncControllerRunInfo := func() controllerRunInfo { - backupSyncContoller := controller.NewBackupSyncController( - s.veleroClient.VeleroV1(), - s.mgr.GetClient(), - s.veleroClient.VeleroV1(), - s.sharedInformerFactory.Velero().V1().Backups().Lister(), - csiVSLister, - s.config.backupSyncPeriod, - s.namespace, - s.csiSnapshotClient, - s.kubeClient, - s.config.defaultBackupLocation, - newPluginManager, - backupStoreGetter, - s.logger, - ) - - return controllerRunInfo{ - controller: backupSyncContoller, - numWorkers: defaultControllerWorkers, - } - } - backupTracker := controller.NewBackupTracker() backupControllerRunInfo := func() controllerRunInfo { @@ -727,16 +704,21 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } + // By far, PodVolumeBackup, PodVolumeRestore, BackupStorageLocation controllers + // are not included in --disable-controllers list. + // This is because of PVB and PVR are used by Restic DaemonSet, + // and BSL controller is mandatory for Velero to work. enabledControllers := map[string]func() controllerRunInfo{ - controller.BackupSync: backupSyncControllerRunInfo, controller.Backup: backupControllerRunInfo, controller.GarbageCollection: gcControllerRunInfo, controller.Restore: restoreControllerRunInfo, } // Note: all runtime type controllers that can be disabled are grouped separately, below: - enabledRuntimeControllers := make(map[string]struct{}) - enabledRuntimeControllers[controller.ServerStatusRequest] = struct{}{} - enabledRuntimeControllers[controller.DownloadRequest] = struct{}{} + enabledRuntimeControllers := map[string]struct{}{ + controller.ServerStatusRequest: {}, + controller.DownloadRequest: {}, + controller.BackupSync: {}, + } if s.config.restoreOnly { s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers") @@ -749,7 +731,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } // Remove disabled controllers so they are not initialized. If a match is not found we want - // to hault the system so the user knows this operation was not possible. + // to halt the system so the user knows this operation was not possible. if err := removeControllers(s.config.disabledControllers, enabledControllers, enabledRuntimeControllers, s.logger); err != nil { log.Fatal(err, "unable to disable a controller") } @@ -783,18 +765,18 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.logger.WithField("informer", informer).Info("Informer cache synced") } - bslr := controller.BackupStorageLocationReconciler{ - Ctx: s.ctx, - Client: s.mgr.GetClient(), - Scheme: s.mgr.GetScheme(), - DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ + bslr := controller.NewBackupStorageLocationReconciler( + s.ctx, + s.mgr.GetClient(), + s.mgr.GetScheme(), + storage.DefaultBackupLocationInfo{ StorageLocation: s.config.defaultBackupLocation, ServerValidationFrequency: s.config.storeValidationFrequency, }, - NewPluginManager: newPluginManager, - BackupStoreGetter: backupStoreGetter, - Log: s.logger, - } + newPluginManager, + backupStoreGetter, + s.logger, + ) if err := bslr.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupStorageLocation) } @@ -848,6 +830,25 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } + if _, ok := enabledRuntimeControllers[controller.BackupSync]; ok { + syncPeriod := s.config.backupSyncPeriod + if syncPeriod <= 0 { + syncPeriod = time.Minute + } + + backupSyncReconciler := controller.NewBackupSyncReconciler( + s.mgr.GetClient(), + s.namespace, + syncPeriod, + newPluginManager, + backupStoreGetter, + s.logger, + ) + if err := backupSyncReconciler.SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, " unable to create controller ", "controller ", controller.BackupSync) + } + } + // TODO(2.0): presuming all controllers and resources are converted to runtime-controller // by v2.0, the block from this line and including the `s.mgr.Start() will be // deprecated, since the manager auto-starts all the caches. Until then, we need to start the diff --git a/pkg/controller/backup_storage_location_controller.go b/pkg/controller/backup_storage_location_controller.go index 1b08da897e..036e8dc8e7 100644 --- a/pkg/controller/backup_storage_location_controller.go +++ b/pkg/controller/backup_storage_location_controller.go @@ -44,35 +44,55 @@ const ( ) // BackupStorageLocationReconciler reconciles a BackupStorageLocation object -type BackupStorageLocationReconciler struct { - Ctx context.Context - Client client.Client - Scheme *runtime.Scheme - DefaultBackupLocationInfo storage.DefaultBackupLocationInfo +type backupStorageLocationReconciler struct { + ctx context.Context + client client.Client + scheme *runtime.Scheme + defaultBackupLocationInfo storage.DefaultBackupLocationInfo // use variables to refer to these functions so they can be // replaced with fakes for testing. - NewPluginManager func(logrus.FieldLogger) clientmgmt.Manager - BackupStoreGetter persistence.ObjectBackupStoreGetter + newPluginManager func(logrus.FieldLogger) clientmgmt.Manager + backupStoreGetter persistence.ObjectBackupStoreGetter - Log logrus.FieldLogger + log logrus.FieldLogger +} + +// NewBackupStorageLocationReconciler initialize and return a backupStorageLocationReconciler struct +func NewBackupStorageLocationReconciler( + ctx context.Context, + client client.Client, + scheme *runtime.Scheme, + defaultBackupLocationInfo storage.DefaultBackupLocationInfo, + newPluginManager func(logrus.FieldLogger) clientmgmt.Manager, + backupStoreGetter persistence.ObjectBackupStoreGetter, + log logrus.FieldLogger) *backupStorageLocationReconciler { + return &backupStorageLocationReconciler{ + ctx: ctx, + client: client, + scheme: scheme, + defaultBackupLocationInfo: defaultBackupLocationInfo, + newPluginManager: newPluginManager, + backupStoreGetter: backupStoreGetter, + log: log, + } } // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations/status,verbs=get;update;patch -func (r *BackupStorageLocationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *backupStorageLocationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var unavailableErrors []string var location velerov1api.BackupStorageLocation - log := r.Log.WithField("controller", BackupStorageLocation).WithField(BackupStorageLocation, req.NamespacedName.String()) + log := r.log.WithField("controller", BackupStorageLocation).WithField(BackupStorageLocation, req.NamespacedName.String()) log.Debug("Validating availability of BackupStorageLocation") - locationList, err := storage.ListBackupStorageLocations(r.Ctx, r.Client, req.Namespace) + locationList, err := storage.ListBackupStorageLocations(r.ctx, r.client, req.Namespace) if err != nil { log.WithError(err).Error("No BackupStorageLocations found, at least one is required") return ctrl.Result{}, nil } - pluginManager := r.NewPluginManager(log) + pluginManager := r.newPluginManager(log) defer pluginManager.CleanupClients() var defaultFound bool @@ -93,7 +113,7 @@ func (r *BackupStorageLocationReconciler) Reconcile(ctx context.Context, req ctr isDefault := location.Spec.Default // TODO(2.0) remove this check since the server default will be deprecated - if !defaultFound && location.Name == r.DefaultBackupLocationInfo.StorageLocation { + if !defaultFound && location.Name == r.defaultBackupLocationInfo.StorageLocation { // For backward-compatible, to configure the backup storage location as the default if // none of the BSLs be marked as the default and the BSL name matches against the // "velero server --default-backup-storage-location". @@ -117,12 +137,12 @@ func (r *BackupStorageLocationReconciler) Reconcile(ctx context.Context, req ctr location.Status.Phase = velerov1api.BackupStorageLocationPhaseAvailable location.Status.Message = "" } - if err := r.Client.Patch(r.Ctx, &location, client.MergeFrom(original)); err != nil { + if err := r.client.Patch(r.ctx, &location, client.MergeFrom(original)); err != nil { log.WithError(err).Error("Error updating BackupStorageLocation phase") } }() - backupStore, err := r.BackupStoreGetter.Get(&location, pluginManager, log) + backupStore, err := r.backupStoreGetter.Get(&location, pluginManager, log) if err != nil { log.WithError(err).Error("Error getting a backup store") return @@ -144,11 +164,11 @@ func (r *BackupStorageLocationReconciler) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, nil } -func (r *BackupStorageLocationReconciler) logReconciledPhase(defaultFound bool, locationList velerov1api.BackupStorageLocationList, errs []string) { +func (r *backupStorageLocationReconciler) logReconciledPhase(defaultFound bool, locationList velerov1api.BackupStorageLocationList, errs []string) { var availableBSLs []*velerov1api.BackupStorageLocation var unAvailableBSLs []*velerov1api.BackupStorageLocation var unknownBSLs []*velerov1api.BackupStorageLocation - log := r.Log.WithField("controller", BackupStorageLocation) + log := r.log.WithField("controller", BackupStorageLocation) for i, location := range locationList.Items { phase := location.Status.Phase @@ -181,16 +201,16 @@ func (r *BackupStorageLocationReconciler) logReconciledPhase(defaultFound bool, } } -func (r *BackupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *backupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) error { g := kube.NewPeriodicalEnqueueSource( - r.Log, + r.log, mgr.GetClient(), &velerov1api.BackupStorageLocationList{}, bslValidationEnqueuePeriod, // Add filter function to enqueue BSL per ValidationFrequency setting. func(object client.Object) bool { location := object.(*velerov1api.BackupStorageLocation) - return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.DefaultBackupLocationInfo.ServerValidationFrequency, r.Log.WithField("controller", BackupStorageLocation)) + return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation)) }, ) return ctrl.NewControllerManagedBy(mgr). diff --git a/pkg/controller/backup_storage_location_controller_test.go b/pkg/controller/backup_storage_location_controller_test.go index 75ad691a25..06d3458c4d 100644 --- a/pkg/controller/backup_storage_location_controller_test.go +++ b/pkg/controller/backup_storage_location_controller_test.go @@ -79,16 +79,16 @@ var _ = Describe("Backup Storage Location Reconciler", func() { // Setup reconciler Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) - r := BackupStorageLocationReconciler{ - Ctx: ctx, - Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(), - DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ + r := backupStorageLocationReconciler{ + ctx: ctx, + client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(), + defaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ StorageLocation: "location-1", ServerValidationFrequency: 0, }, - NewPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, - BackupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), - Log: velerotest.NewLogger(), + newPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, + backupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), + log: velerotest.NewLogger(), } // Assertions @@ -101,7 +101,7 @@ var _ = Describe("Backup Storage Location Reconciler", func() { key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace} instance := &velerov1api.BackupStorageLocation{} - err = r.Client.Get(ctx, key, instance) + err = r.client.Get(ctx, key, instance) Expect(err).To(BeNil()) Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault)) Expect(instance.Status.Phase).To(BeIdenticalTo(tests[i].expectedPhase)) @@ -144,16 +144,16 @@ var _ = Describe("Backup Storage Location Reconciler", func() { // Setup reconciler Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) - r := BackupStorageLocationReconciler{ - Ctx: ctx, - Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(), - DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ + r := backupStorageLocationReconciler{ + ctx: ctx, + client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(), + defaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ StorageLocation: "default", ServerValidationFrequency: 0, }, - NewPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, - BackupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), - Log: velerotest.NewLogger(), + newPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, + backupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), + log: velerotest.NewLogger(), } // Assertions @@ -166,7 +166,7 @@ var _ = Describe("Backup Storage Location Reconciler", func() { key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace} instance := &velerov1api.BackupStorageLocation{} - err = r.Client.Get(ctx, key, instance) + err = r.client.Get(ctx, key, instance) Expect(err).To(BeNil()) Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault)) } diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index c19badd6e3..f900c1aab2 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2020 the Velero contributors. +Copyright The Velero Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,138 +21,88 @@ import ( "time" snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" - snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" - snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" kuberrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/kubernetes" "github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/internal/storage" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/features" - velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1" - velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1" "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/persistence" "github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) -type backupSyncController struct { - *genericController +const ( + backupSyncReconcilePeriod = time.Minute +) - backupClient velerov1client.BackupsGetter - kbClient client.Client - podVolumeBackupClient velerov1client.PodVolumeBackupsGetter - backupLister velerov1listers.BackupLister - csiVSLister snapshotv1listers.VolumeSnapshotLister - csiSnapshotClient *snapshotterClientSet.Clientset - kubeClient kubernetes.Interface +type backupSyncReconciler struct { + client client.Client namespace string - defaultBackupLocation string defaultBackupSyncPeriod time.Duration newPluginManager func(logrus.FieldLogger) clientmgmt.Manager backupStoreGetter persistence.ObjectBackupStoreGetter + logger logrus.FieldLogger } -func NewBackupSyncController( - backupClient velerov1client.BackupsGetter, - kbClient client.Client, - podVolumeBackupClient velerov1client.PodVolumeBackupsGetter, - backupLister velerov1listers.BackupLister, - csiVSLister snapshotv1listers.VolumeSnapshotLister, - syncPeriod time.Duration, +// NewBackupSyncReconciler is used to generate BackupSync reconciler structure. +func NewBackupSyncReconciler( + client client.Client, namespace string, - csiSnapshotClient *snapshotterClientSet.Clientset, - kubeClient kubernetes.Interface, - defaultBackupLocation string, + defaultBackupSyncPeriod time.Duration, newPluginManager func(logrus.FieldLogger) clientmgmt.Manager, backupStoreGetter persistence.ObjectBackupStoreGetter, - logger logrus.FieldLogger, -) Interface { - if syncPeriod <= 0 { - syncPeriod = time.Minute - } - logger.Infof("Backup sync period is %v", syncPeriod) - - c := &backupSyncController{ - genericController: newGenericController(BackupSync, logger), - backupClient: backupClient, - kbClient: kbClient, - podVolumeBackupClient: podVolumeBackupClient, + logger logrus.FieldLogger) *backupSyncReconciler { + return &backupSyncReconciler{ + client: client, namespace: namespace, - defaultBackupLocation: defaultBackupLocation, - defaultBackupSyncPeriod: syncPeriod, - backupLister: backupLister, - csiVSLister: csiVSLister, - csiSnapshotClient: csiSnapshotClient, - kubeClient: kubeClient, - - // use variables to refer to these functions so they can be - // replaced with fakes for testing. - newPluginManager: newPluginManager, - backupStoreGetter: backupStoreGetter, - } - - c.resyncFunc = c.run - c.resyncPeriod = 30 * time.Second - - return c -} - -// orderedBackupLocations returns a new slice with the default backup location first (if it exists), -// followed by the rest of the locations in no particular order. -func orderedBackupLocations(locationList *velerov1api.BackupStorageLocationList, defaultLocationName string) []velerov1api.BackupStorageLocation { - var result []velerov1api.BackupStorageLocation - - for i := range locationList.Items { - if locationList.Items[i].Name == defaultLocationName { - // put the default location first - result = append(result, locationList.Items[i]) - // append everything before the default - result = append(result, locationList.Items[:i]...) - // append everything after the default - result = append(result, locationList.Items[i+1:]...) - - return result - } + defaultBackupSyncPeriod: defaultBackupSyncPeriod, + newPluginManager: newPluginManager, + backupStoreGetter: backupStoreGetter, + logger: logger, } - - return locationList.Items } -func (c *backupSyncController) run() { - c.logger.Debug("Checking for existing backup storage locations to sync into cluster") +// Reconcile syncs between the backups in cluster and backups metadata in object store. +func (b *backupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := b.logger.WithField("controller", BackupSync) + log.Debug("Checking for existing backup storage locations to sync into cluster.") - locationList, err := storage.ListBackupStorageLocations(context.Background(), c.kbClient, c.namespace) + locationList, err := storage.ListBackupStorageLocations(ctx, b.client, b.namespace) if err != nil { - c.logger.WithError(err).Error("No backup storage locations found, at least one is required") - return + log.WithError(err).Error("No backup storage locations found, at least one is required") + return ctrl.Result{Requeue: false}, err } // sync the default backup storage location first, if it exists + defaultBackupLocationName := "" for _, location := range locationList.Items { if location.Spec.Default { - c.defaultBackupLocation = location.Name + defaultBackupLocationName = location.Name break } } - locations := orderedBackupLocations(&locationList, c.defaultBackupLocation) + locations := orderedBackupLocations(&locationList, defaultBackupLocationName) - pluginManager := c.newPluginManager(c.logger) + pluginManager := b.newPluginManager(log) defer pluginManager.CleanupClients() for _, location := range locations { - log := c.logger.WithField("backupLocation", location.Name) + log := log.WithField("backupLocation", location.Name) - syncPeriod := c.defaultBackupSyncPeriod + syncPeriod := b.defaultBackupSyncPeriod if location.Spec.BackupSyncPeriod != nil { syncPeriod = location.Spec.BackupSyncPeriod.Duration if syncPeriod == 0 { @@ -162,7 +112,7 @@ func (c *backupSyncController) run() { if syncPeriod < 0 { log.Debug("Backup sync period must be non-negative") - syncPeriod = c.defaultBackupSyncPeriod + syncPeriod = b.defaultBackupSyncPeriod } } @@ -177,7 +127,7 @@ func (c *backupSyncController) run() { log.Debug("Checking backup location for backups to sync into cluster") - backupStore, err := c.backupStoreGetter.Get(&location, pluginManager, log) + backupStore, err := b.backupStoreGetter.Get(&location, pluginManager, log) if err != nil { log.WithError(err).Error("Error getting backup store for this location") continue @@ -193,16 +143,22 @@ func (c *backupSyncController) run() { log.WithField("backupCount", len(backupStoreBackups)).Debug("Got backups from backup store") // get a list of all the backups that exist as custom resources in the cluster - clusterBackups, err := c.backupLister.Backups(c.namespace).List(labels.Everything()) + var clusterBackupList velerov1api.BackupList + listOption := client.ListOptions{ + LabelSelector: labels.Everything(), + Namespace: b.namespace, + } + + err = b.client.List(ctx, &clusterBackupList, &listOption) if err != nil { log.WithError(errors.WithStack(err)).Error("Error getting backups from cluster, proceeding with sync into cluster") } else { - log.WithField("backupCount", len(clusterBackups)).Debug("Got backups from cluster") + log.WithField("backupCount", len(clusterBackupList.Items)).Debug("Got backups from cluster") } // get a list of backups that *are* in the backup storage location and *aren't* in the cluster clusterBackupsSet := sets.NewString() - for _, b := range clusterBackups { + for _, b := range clusterBackupList.Items { clusterBackupsSet.Insert(b.Name) } backupsToSync := backupStoreBackups.Difference(clusterBackupsSet) @@ -224,7 +180,7 @@ func (c *backupSyncController) run() { continue } - backup.Namespace = c.namespace + backup.Namespace = b.namespace backup.ResourceVersion = "" // update the StorageLocation field and label since the name of the location @@ -237,7 +193,7 @@ func (c *backupSyncController) run() { backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation) // attempt to create backup custom resource via API - backup, err = c.backupClient.Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{}) + err = b.client.Create(ctx, backup, &client.CreateOptions{}) switch { case err != nil && kuberrs.IsAlreadyExists(err): log.Debug("Backup already exists in cluster") @@ -274,7 +230,7 @@ func (c *backupSyncController) run() { podVolumeBackup.Namespace = backup.Namespace podVolumeBackup.ResourceVersion = "" - _, err = c.podVolumeBackupClient.PodVolumeBackups(backup.Namespace).Create(context.TODO(), podVolumeBackup, metav1.CreateOptions{}) + err = b.client.Create(ctx, podVolumeBackup, &client.CreateOptions{}) switch { case err != nil && kuberrs.IsAlreadyExists(err): log.Debug("Pod volume backup already exists in cluster") @@ -290,20 +246,25 @@ func (c *backupSyncController) run() { if features.IsEnabled(velerov1api.CSIFeatureFlag) { // we are syncing these objects only to ensure that the storage snapshots are cleaned up // on backup deletion or expiry. - log.Info("Syncing CSI volumesnapshotclasses in backup") + log.Info("Syncing CSI VolumeSnapshotClasses in backup") vsClasses, err := backupStore.GetCSIVolumeSnapshotClasses(backupName) if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting CSI volumesnapclasses for this backup from backup store") + log.WithError(errors.WithStack(err)).Error("Error getting CSI VolumeSnapClasses for this backup from backup store") continue } for _, vsClass := range vsClasses { vsClass.ResourceVersion = "" - created, err := c.csiSnapshotClient.SnapshotV1().VolumeSnapshotClasses().Create(context.TODO(), vsClass, metav1.CreateOptions{}) - if err != nil { - log.WithError(errors.WithStack(err)).Errorf("Error syncing volumesnapshotclass %s into cluster", vsClass.Name) + err := b.client.Create(ctx, vsClass, &client.CreateOptions{}) + switch { + case err != nil && kuberrs.IsAlreadyExists(err): + log.Debugf("VolumeSnapshotClass %s already exists in cluster", vsClass.Name) + continue + case err != nil && !kuberrs.IsAlreadyExists(err): + log.WithError(errors.WithStack(err)).Errorf("Error syncing VolumeSnapshotClass %s into cluster", vsClass.Name) continue + default: + log.Infof("Created CSI VolumeSnapshotClass %s", vsClass.Name) } - log.Infof("Created CSI volumesnapshotclass %s", created.Name) } log.Info("Syncing CSI volumesnapshotcontents in backup") @@ -317,7 +278,7 @@ func (c *backupSyncController) run() { for _, snapCont := range snapConts { // TODO: Reset ResourceVersion prior to persisting VolumeSnapshotContents snapCont.ResourceVersion = "" - created, err := c.csiSnapshotClient.SnapshotV1().VolumeSnapshotContents().Create(context.TODO(), snapCont, metav1.CreateOptions{}) + err := b.client.Create(ctx, snapCont, &client.CreateOptions{}) switch { case err != nil && kuberrs.IsAlreadyExists(err): log.Debugf("volumesnapshotcontent %s already exists in cluster", snapCont.Name) @@ -326,73 +287,149 @@ func (c *backupSyncController) run() { log.WithError(errors.WithStack(err)).Errorf("Error syncing volumesnapshotcontent %s into cluster", snapCont.Name) continue default: - log.Infof("Created CSI volumesnapshotcontent %s", created.Name) + log.Infof("Created CSI volumesnapshotcontent %s", snapCont.Name) } } } } - c.deleteOrphanedBackups(location.Name, backupStoreBackups, log) + b.deleteOrphanedBackups(ctx, location.Name, backupStoreBackups, log) // update the location's last-synced time field statusPatch := client.MergeFrom(location.DeepCopy()) location.Status.LastSyncedTime = &metav1.Time{Time: time.Now().UTC()} - if err := c.kbClient.Patch(context.Background(), &location, statusPatch); err != nil { + if err := b.client.Patch(ctx, &location, statusPatch); err != nil { log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time") continue } } + + return ctrl.Result{}, nil +} + +// SetupWithManager is used to setup controller and its watching sources. +func (b *backupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { + backupSyncSource := kube.NewPeriodicalEnqueueSource( + b.logger, + mgr.GetClient(), + &velerov1api.BackupStorageLocationList{}, + backupSyncReconcilePeriod, + // Only enqueue the first BSL + func(object client.Object) bool { + var bslList velerov1api.BackupStorageLocationList + b.client.List(context.Background(), &bslList, &client.ListOptions{ + Namespace: b.namespace, + }) + if bslList.Items[0].Namespace == object.GetNamespace() && + bslList.Items[0].Name == object.GetName() { + return true + } + return false + }, + ) + + return ctrl.NewControllerManagedBy(mgr). + For(&velerov1api.BackupStorageLocation{}). + // Filter all BSL events, because this controller is supposed to run periodically, not by event. + WithEventFilter(predicate.Funcs{ + CreateFunc: func(ce event.CreateEvent) bool { + return false + }, + UpdateFunc: func(ue event.UpdateEvent) bool { + return false + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + }). + Watches(backupSyncSource, nil). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(b) } // deleteOrphanedBackups deletes backup objects (CRDs) from Kubernetes that have the specified location // and a phase of Completed, but no corresponding backup in object storage. -func (c *backupSyncController) deleteOrphanedBackups(locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) { - locationSelector := labels.Set(map[string]string{ - velerov1api.StorageLocationLabel: label.GetValidName(locationName), - }).AsSelector() - - backups, err := c.backupLister.Backups(c.namespace).List(locationSelector) +func (b *backupSyncReconciler) deleteOrphanedBackups(ctx context.Context, locationName string, backupStoreBackups sets.String, log logrus.FieldLogger) { + var backupList velerov1api.BackupList + listOption := client.ListOptions{ + LabelSelector: labels.Set(map[string]string{ + velerov1api.StorageLocationLabel: label.GetValidName(locationName), + }).AsSelector(), + } + err := b.client.List(ctx, &backupList, &listOption) if err != nil { log.WithError(errors.WithStack(err)).Error("Error listing backups from cluster") return } - if len(backups) == 0 { + + if len(backupList.Items) == 0 { return } - for _, backup := range backups { + for _, backup := range backupList.Items { log = log.WithField("backup", backup.Name) if backup.Status.Phase != velerov1api.BackupPhaseCompleted || backupStoreBackups.Has(backup.Name) { continue } - if err := c.backupClient.Backups(backup.Namespace).Delete(context.TODO(), backup.Name, metav1.DeleteOptions{}); err != nil { + + if err := b.client.Delete(ctx, &backup, &client.DeleteOptions{}); err != nil { log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster") } else { log.Debug("Deleted orphaned backup from cluster") - c.deleteCSISnapshotsByBackup(backup.Name, log) + b.deleteCSISnapshotsByBackup(ctx, backup.Name, log) } } } -func (c *backupSyncController) deleteCSISnapshotsByBackup(backupName string, log logrus.FieldLogger) { +func (b *backupSyncReconciler) deleteCSISnapshotsByBackup(ctx context.Context, backupName string, log logrus.FieldLogger) { if !features.IsEnabled(velerov1api.CSIFeatureFlag) { return } m := client.MatchingLabels{velerov1api.BackupNameLabel: label.GetValidName(backupName)} - if vsList, err := c.csiVSLister.List(label.NewSelectorForBackup(label.GetValidName(backupName))); err != nil { + var vsList snapshotv1api.VolumeSnapshotList + listOptions := &client.ListOptions{ + LabelSelector: label.NewSelectorForBackup(label.GetValidName(backupName)), + } + if err := b.client.List(ctx, &vsList, listOptions); err != nil { log.WithError(err).Warnf("Failed to list volumesnapshots for backup: %s, the deletion will be skipped", backupName) } else { - for _, vs := range vsList { + for _, vs := range vsList.Items { name := kube.NamespaceAndName(vs.GetObjectMeta()) log.Debugf("Deleting volumesnapshot %s", name) - if err := c.kbClient.Delete(context.TODO(), vs); err != nil { + if err := b.client.Delete(context.TODO(), &vs); err != nil { log.WithError(err).Warnf("Failed to delete volumesnapshot %s", name) } } } vsc := &snapshotv1api.VolumeSnapshotContent{} log.Debugf("Deleting volumesnapshotcontents for backup: %s", backupName) - if err := c.kbClient.DeleteAllOf(context.TODO(), vsc, m); err != nil { + if err := b.client.DeleteAllOf(context.TODO(), vsc, m); err != nil { log.WithError(err).Warnf("Failed to delete volumesnapshotcontents for backup: %s", backupName) } } + +// orderedBackupLocations returns a new slice with the default backup location first (if it exists), +// followed by the rest of the locations in no particular order. +func orderedBackupLocations(locationList *velerov1api.BackupStorageLocationList, defaultLocationName string) []velerov1api.BackupStorageLocation { + var result []velerov1api.BackupStorageLocation + + for i := range locationList.Items { + if locationList.Items[i].Name == defaultLocationName { + // put the default location first + result = append(result, locationList.Items[i]) + // append everything before the default + result = append(result, locationList.Items[:i]...) + // append everything after the default + result = append(result, locationList.Items[i+1:]...) + + return result + } + } + + return locationList.Items +} diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index 6ba8253733..b20ec63967 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -18,21 +18,25 @@ package controller import ( "context" - "testing" + "fmt" "time" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" core "k8s.io/client-go/testing" + ctrl "sigs.k8s.io/controller-runtime" + ctrlClient "sigs.k8s.io/controller-runtime/pkg/client" + ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" - "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" - informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions" "github.com/vmware-tanzu/velero/pkg/label" persistencemocks "github.com/vmware-tanzu/velero/pkg/persistence/mocks" "github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt" @@ -107,264 +111,266 @@ func defaultLocationsListWithLongerLocationName(namespace string) []*velerov1api } } -func TestBackupSyncControllerRun(t *testing.T) { - type cloudBackupData struct { - backup *velerov1api.Backup - podVolumeBackups []*velerov1api.PodVolumeBackup +func numBackups(c ctrlClient.WithWatch, ns string) (int, error) { + var existingK8SBackups velerov1api.BackupList + err := c.List(context.TODO(), &existingK8SBackups, &ctrlClient.ListOptions{}) + if err != nil { + return 0, err } - tests := []struct { - name string - namespace string - locations []*velerov1api.BackupStorageLocation - cloudBuckets map[string][]*cloudBackupData - existingBackups []*velerov1api.Backup - existingPodVolumeBackups []*velerov1api.PodVolumeBackup - longLocationNameEnabled bool - }{ - { - name: "no cloud backups", - }, - { - name: "normal case", - namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, - }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), - }, - }, + return len(existingK8SBackups.Items), nil +} + +var _ = Describe("Backup Sync Reconciler", func() { + It("Test Backup Sync Reconciler basic function", func() { + type cloudBackupData struct { + backup *velerov1api.Backup + podVolumeBackups []*velerov1api.PodVolumeBackup + } + + tests := []struct { + name string + namespace string + locations []*velerov1api.BackupStorageLocation + cloudBuckets map[string][]*cloudBackupData + existingBackups []*velerov1api.Backup + existingPodVolumeBackups []*velerov1api.PodVolumeBackup + longLocationNameEnabled bool + }{ + { + name: "no cloud backups", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), }, - }, - { - name: "all synced backups get created in Velero server's namespace", - namespace: "velero", - locations: defaultLocationsList("velero"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, - }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-2", "backup-3").Result(), + { + name: "normal case", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), + }, }, - &cloudBackupData{ - backup: builder.ForBackup("velero", "backup-4").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-3").Result(), + }, }, }, }, - }, - { - name: "new backups get synced when some cloud backups already exist in the cluster", - namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, - }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-4").Result(), + { + name: "all synced backups get created in Velero server's namespace", + namespace: "velero", + locations: defaultLocationsList("velero"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), + }, }, - }, - }, - existingBackups: []*velerov1api.Backup{ - // add a label to each existing backup so we can differentiate it from the cloud - // backup during verification - builder.ForBackup("ns-1", "backup-1").StorageLocation("location-1").ObjectMeta(builder.WithLabels("i-exist", "true")).Result(), - builder.ForBackup("ns-1", "backup-3").StorageLocation("location-2").ObjectMeta(builder.WithLabels("i-exist", "true")).Result(), - }, - }, - { - name: "existing backups without a StorageLocation get it filled in", - namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-2", "backup-3").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("velero", "backup-4").Result(), + }, }, }, }, - existingBackups: []*velerov1api.Backup{ - // add a label to each existing backup so we can differentiate it from the cloud - // backup during verification - builder.ForBackup("ns-1", "backup-1").ObjectMeta(builder.WithLabels("i-exist", "true")).StorageLocation("location-1").Result(), - }, - }, - { - name: "backup storage location names and labels get updated", - namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), + { + name: "new backups get synced when some cloud backups already exist in the cluster", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), + }, }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-3").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-4").Result(), + }, }, }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").StorageLocation("bar").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "bar")).Result(), - }, + existingBackups: []*velerov1api.Backup{ + // add a label to each existing backup so we can differentiate it from the cloud + // backup during verification + builder.ForBackup("ns-1", "backup-1").StorageLocation("location-1").ObjectMeta(builder.WithLabels("i-exist", "true")).Result(), + builder.ForBackup("ns-1", "backup-3").StorageLocation("location-2").ObjectMeta(builder.WithLabels("i-exist", "true")).Result(), }, }, - }, - { - name: "backup storage location names and labels get updated with location name greater than 63 chars", - namespace: "ns-1", - locations: defaultLocationsListWithLongerLocationName("ns-1"), - longLocationNameEnabled: true, - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), + { + name: "existing backups without a StorageLocation get it filled in", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").Result(), + }, }, }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").StorageLocation("bar").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "bar")).Result(), - }, + existingBackups: []*velerov1api.Backup{ + // add a label to each existing backup so we can differentiate it from the cloud + // backup during verification + builder.ForBackup("ns-1", "backup-1").ObjectMeta(builder.WithLabels("i-exist", "true")).StorageLocation("location-1").Result(), }, }, - }, - { - name: "all synced backups and pod volume backups get created in Velero server's namespace", - namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + { + name: "backup storage location names and labels get updated", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), }, }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-3").StorageLocation("bar").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "bar")).Result(), }, }, }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), + }, + { + name: "backup storage location names and labels get updated with location name greater than 63 chars", + namespace: "ns-1", + locations: defaultLocationsListWithLongerLocationName("ns-1"), + longLocationNameEnabled: true, + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), + }, }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-4").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-3").StorageLocation("bar").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "bar")).Result(), }, }, }, }, - }, - { - name: "new pod volume backups get synched when some pod volume backups already exist in the cluster", - namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + { + name: "all synced backups and pod volume backups get created in Velero server's namespace", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + }, + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), + }, }, }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-3").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-4").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), + builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), + }, }, }, }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), + }, + { + name: "new pod volume backups get synched when some pod volume backups already exist in the cluster", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBuckets: map[string][]*cloudBackupData{ + "bucket-1": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-1").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + }, + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-2").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), + }, + }, }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-4").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-5").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-6").Result(), + "bucket-2": { + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-3").Result(), + }, + &cloudBackupData{ + backup: builder.ForBackup("ns-1", "backup-4").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + builder.ForPodVolumeBackup("ns-1", "pvb-5").Result(), + builder.ForPodVolumeBackup("ns-1", "pvb-6").Result(), + }, }, }, }, + existingPodVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), + builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), + }, }, - existingPodVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), - }, - }, - } + } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { + for _, test := range tests { var ( - client = fake.NewSimpleClientset() - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - sharedInformers = informers.NewSharedInformerFactory(client, 0) - pluginManager = &pluginmocks.Manager{} - backupStores = make(map[string]*persistencemocks.BackupStore) + client = ctrlfake.NewClientBuilder().Build() + pluginManager = &pluginmocks.Manager{} + backupStores = make(map[string]*persistencemocks.BackupStore) ) - c := NewBackupSyncController( - client.VeleroV1(), - fakeClient, - client.VeleroV1(), - sharedInformers.Velero().V1().Backups().Lister(), - nil, // csiVSLister - time.Duration(0), - test.namespace, - nil, // csiSnapshotClient - nil, // kubeClient - "", - func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, - NewFakeObjectBackupStoreGetter(backupStores), - velerotest.NewLogger(), - ).(*backupSyncController) - pluginManager.On("CleanupClients").Return(nil) + r := backupSyncReconciler{ + client: client, + namespace: test.namespace, + defaultBackupSyncPeriod: time.Second * 10, + newPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, + backupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), + logger: velerotest.NewLogger(), + } for _, location := range test.locations { - require.NoError(t, fakeClient.Create(context.Background(), location)) + Expect(r.client.Create(ctx, location)).ShouldNot(HaveOccurred()) backupStores[location.Name] = &persistencemocks.BackupStore{} } for _, location := range test.locations { backupStore, ok := backupStores[location.Name] - require.True(t, ok, "no mock backup store for location %s", location.Name) + Expect(ok).To(BeTrue(), "no mock backup store for location %s", location.Name) var backupNames []string for _, bucket := range test.cloudBuckets[location.Spec.ObjectStorage.Bucket] { @@ -376,21 +382,21 @@ func TestBackupSyncControllerRun(t *testing.T) { } for _, existingBackup := range test.existingBackups { - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(existingBackup)) - - _, err := client.VeleroV1().Backups(test.namespace).Create(context.TODO(), existingBackup, metav1.CreateOptions{}) - require.NoError(t, err) + err := client.Create(context.TODO(), existingBackup, &ctrlClient.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) } for _, existingPodVolumeBackup := range test.existingPodVolumeBackups { - require.NoError(t, sharedInformers.Velero().V1().PodVolumeBackups().Informer().GetStore().Add(existingPodVolumeBackup)) - - _, err := client.VeleroV1().PodVolumeBackups(test.namespace).Create(context.TODO(), existingPodVolumeBackup, metav1.CreateOptions{}) - require.NoError(t, err) + err := client.Create(context.TODO(), existingPodVolumeBackup, &ctrlClient.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) } - client.ClearActions() - c.run() + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: "ns-1"}, + }) + + Expect(actualResult).To(BeEquivalentTo(ctrl.Result{})) + Expect(err).To(BeNil()) for bucket, backupDataSet := range test.cloudBuckets { // figure out which location this bucket is for; we need this for verification @@ -402,12 +408,18 @@ func TestBackupSyncControllerRun(t *testing.T) { break } } - require.NotNil(t, location) + Expect(location).NotTo(BeNil()) // process the cloud backups for _, cloudBackupData := range backupDataSet { - obj, err := client.VeleroV1().Backups(test.namespace).Get(context.TODO(), cloudBackupData.backup.Name, metav1.GetOptions{}) - require.NoError(t, err) + obj := &velerov1api.Backup{} + err := client.Get( + context.TODO(), + types.NamespacedName{ + Namespace: cloudBackupData.backup.Namespace, + Name: cloudBackupData.backup.Name}, + obj) + Expect(err).To(BeNil()) // did this cloud backup already exist in the cluster? var existing *velerov1api.Backup @@ -426,23 +438,30 @@ func TestBackupSyncControllerRun(t *testing.T) { expected := existing.DeepCopy() expected.Spec.StorageLocation = location.Name - assert.Equal(t, expected, obj) + Expect(expected).To(BeEquivalentTo(obj)) } else { // verify that the storage location field and label are set properly - assert.Equal(t, location.Name, obj.Spec.StorageLocation) + Expect(location.Name).To(BeEquivalentTo(obj.Spec.StorageLocation)) locationName := location.Name if test.longLocationNameEnabled { locationName = label.GetValidName(locationName) } - assert.Equal(t, locationName, obj.Labels[velerov1api.StorageLocationLabel]) - assert.Equal(t, true, len(obj.Labels[velerov1api.StorageLocationLabel]) <= validation.DNS1035LabelMaxLength) + Expect(locationName).To(BeEquivalentTo(obj.Labels[velerov1api.StorageLocationLabel])) + Expect(len(obj.Labels[velerov1api.StorageLocationLabel]) <= validation.DNS1035LabelMaxLength).To(BeTrue()) } // process the cloud pod volume backups for this backup, if any for _, podVolumeBackup := range cloudBackupData.podVolumeBackups { - objPodVolumeBackup, err := client.VeleroV1().PodVolumeBackups(test.namespace).Get(context.TODO(), podVolumeBackup.Name, metav1.GetOptions{}) - require.NoError(t, err) + objPodVolumeBackup := &velerov1api.PodVolumeBackup{} + err := client.Get( + context.TODO(), + types.NamespacedName{ + Namespace: podVolumeBackup.Namespace, + Name: podVolumeBackup.Name, + }, + objPodVolumeBackup) + Expect(err).ShouldNot(HaveOccurred()) // did this cloud pod volume backup already exist in the cluster? var existingPodVolumeBackup *velerov1api.PodVolumeBackup @@ -457,135 +476,153 @@ func TestBackupSyncControllerRun(t *testing.T) { // if this cloud pod volume backup already exists in the cluster, make sure that what we get from the // client is the existing backup, not the cloud one. expected := existingPodVolumeBackup.DeepCopy() - assert.Equal(t, expected, objPodVolumeBackup) + Expect(expected).To(BeEquivalentTo(objPodVolumeBackup)) } } } } - }) - } -} + } + }) -func TestDeleteOrphanedBackups(t *testing.T) { - baseBuilder := func(name string) *builder.BackupBuilder { - return builder.ForBackup("ns-1", name).ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "default")) - } + It("Test deleting orphaned backups.", func() { + longLabelName := "the-really-long-location-name-that-is-much-more-than-63-characters" - tests := []struct { - name string - cloudBackups sets.String - k8sBackups []*velerov1api.Backup - namespace string - expectedDeletes sets.String - }{ - { - name: "no overlapping backups", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - baseBuilder("backupA").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backupB").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backupC").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder := func(name string) *builder.BackupBuilder { + return builder.ForBackup("ns-1", name).ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "default")) + } + + tests := []struct { + name string + cloudBackups sets.String + k8sBackups []*velerov1api.Backup + namespace string + expectedDeletes sets.String + useLongBSLName bool + }{ + { + name: "no overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + baseBuilder("backupA").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backupB").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backupC").Phase(velerov1api.BackupPhaseCompleted).Result(), + }, + expectedDeletes: sets.NewString("backupA", "backupB", "backupC"), }, - expectedDeletes: sets.NewString("backupA", "backupB", "backupC"), - }, - { - name: "some overlapping backups", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - baseBuilder("backup-1").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-2").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-C").Phase(velerov1api.BackupPhaseCompleted).Result(), + { + name: "some overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + baseBuilder("backup-1").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-2").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-C").Phase(velerov1api.BackupPhaseCompleted).Result(), + }, + expectedDeletes: sets.NewString("backup-C"), }, - expectedDeletes: sets.NewString("backup-C"), - }, - { - name: "all overlapping backups", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - baseBuilder("backup-1").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-2").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-3").Phase(velerov1api.BackupPhaseCompleted).Result(), + { + name: "all overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + baseBuilder("backup-1").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-2").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-3").Phase(velerov1api.BackupPhaseCompleted).Result(), + }, + expectedDeletes: sets.NewString(), }, - expectedDeletes: sets.NewString(), - }, - { - name: "no overlapping backups but including backups that are not complete", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - baseBuilder("backupA").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("Deleting").Phase(velerov1api.BackupPhaseDeleting).Result(), - baseBuilder("Failed").Phase(velerov1api.BackupPhaseFailed).Result(), - baseBuilder("FailedValidation").Phase(velerov1api.BackupPhaseFailedValidation).Result(), - baseBuilder("InProgress").Phase(velerov1api.BackupPhaseInProgress).Result(), - baseBuilder("New").Phase(velerov1api.BackupPhaseNew).Result(), + { + name: "no overlapping backups but including backups that are not complete", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + baseBuilder("backupA").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("Deleting").Phase(velerov1api.BackupPhaseDeleting).Result(), + baseBuilder("Failed").Phase(velerov1api.BackupPhaseFailed).Result(), + baseBuilder("FailedValidation").Phase(velerov1api.BackupPhaseFailedValidation).Result(), + baseBuilder("InProgress").Phase(velerov1api.BackupPhaseInProgress).Result(), + baseBuilder("New").Phase(velerov1api.BackupPhaseNew).Result(), + }, + expectedDeletes: sets.NewString("backupA"), }, - expectedDeletes: sets.NewString("backupA"), - }, - { - name: "all overlapping backups and all backups that are not complete", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - baseBuilder("backup-1").Phase(velerov1api.BackupPhaseFailed).Result(), - baseBuilder("backup-2").Phase(velerov1api.BackupPhaseFailedValidation).Result(), - baseBuilder("backup-3").Phase(velerov1api.BackupPhaseInProgress).Result(), + { + name: "all overlapping backups and all backups that are not complete", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + baseBuilder("backup-1").Phase(velerov1api.BackupPhaseFailed).Result(), + baseBuilder("backup-2").Phase(velerov1api.BackupPhaseFailedValidation).Result(), + baseBuilder("backup-3").Phase(velerov1api.BackupPhaseInProgress).Result(), + }, + expectedDeletes: sets.NewString(), }, - expectedDeletes: sets.NewString(), - }, - { - name: "no completed backups in other locations are deleted", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - baseBuilder("backup-1").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-2").Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-C").Phase(velerov1api.BackupPhaseCompleted).Result(), - - baseBuilder("backup-4").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "alternate")).Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-5").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "alternate")).Phase(velerov1api.BackupPhaseCompleted).Result(), - baseBuilder("backup-6").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "alternate")).Phase(velerov1api.BackupPhaseCompleted).Result(), + { + name: "no completed backups in other locations are deleted", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + baseBuilder("backup-1").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-2").Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-C").Phase(velerov1api.BackupPhaseCompleted).Result(), + + baseBuilder("backup-4").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "alternate")).Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-5").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "alternate")).Phase(velerov1api.BackupPhaseCompleted).Result(), + baseBuilder("backup-6").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "alternate")).Phase(velerov1api.BackupPhaseCompleted).Result(), + }, + expectedDeletes: sets.NewString("backup-C"), }, - expectedDeletes: sets.NewString("backup-C"), - }, - } + { + name: "some overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*velerov1api.Backup{ + builder.ForBackup("ns-1", "backup-1"). + ObjectMeta( + builder.WithLabels(velerov1api.StorageLocationLabel, "the-really-long-location-name-that-is-much-more-than-63-c69e779"), + ). + Phase(velerov1api.BackupPhaseCompleted). + Result(), + builder.ForBackup("ns-1", "backup-2"). + ObjectMeta( + builder.WithLabels(velerov1api.StorageLocationLabel, "the-really-long-location-name-that-is-much-more-than-63-c69e779"), + ). + Phase(velerov1api.BackupPhaseCompleted). + Result(), + builder.ForBackup("ns-1", "backup-C"). + ObjectMeta( + builder.WithLabels(velerov1api.StorageLocationLabel, "the-really-long-location-name-that-is-much-more-than-63-c69e779"), + ). + Phase(velerov1api.BackupPhaseCompleted). + Result(), + }, + expectedDeletes: sets.NewString("backup-C"), + useLongBSLName: true, + }, + } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { + for _, test := range tests { var ( - client = fake.NewSimpleClientset() - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - sharedInformers = informers.NewSharedInformerFactory(client, 0) + client = ctrlfake.NewClientBuilder().Build() + pluginManager = &pluginmocks.Manager{} + backupStores = make(map[string]*persistencemocks.BackupStore) ) - c := NewBackupSyncController( - client.VeleroV1(), - fakeClient, - client.VeleroV1(), - sharedInformers.Velero().V1().Backups().Lister(), - nil, // csiVSLister - time.Duration(0), - test.namespace, - nil, // csiSnapshotClient - nil, // kubeClient - "", - nil, // new plugin manager func - nil, // backupStoreGetter - velerotest.NewLogger(), - ).(*backupSyncController) + r := backupSyncReconciler{ + client: client, + namespace: test.namespace, + defaultBackupSyncPeriod: time.Second * 10, + newPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, + backupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), + logger: velerotest.NewLogger(), + } expectedDeleteActions := make([]core.Action, 0) for _, backup := range test.k8sBackups { - // add test backup to informer - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(backup), "Error adding backup to informer") - // add test backup to client - _, err := client.VeleroV1().Backups(test.namespace).Create(context.TODO(), backup, metav1.CreateOptions{}) - require.NoError(t, err, "Error adding backup to clientset") + err := client.Create(context.TODO(), backup, &ctrlClient.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) // if we expect this backup to be deleted, set up the expected DeleteAction if test.expectedDeletes.Has(backup.Name) { @@ -598,140 +635,19 @@ func TestDeleteOrphanedBackups(t *testing.T) { } } - c.deleteOrphanedBackups("default", test.cloudBackups, velerotest.NewLogger()) - - numBackups, err := numBackups(t, client, c.namespace) - assert.NoError(t, err) - - expected := len(test.k8sBackups) - len(test.expectedDeletes) - assert.Equal(t, expected, numBackups) - - velerotest.CompareActions(t, expectedDeleteActions, getDeleteActions(client.Actions())) - }) - } -} - -func TestStorageLabelsInDeleteOrphanedBackups(t *testing.T) { - longLabelName := "the-really-long-location-name-that-is-much-more-than-63-characters" - tests := []struct { - name string - cloudBackups sets.String - k8sBackups []*velerov1api.Backup - namespace string - expectedDeletes sets.String - }{ - { - name: "some overlapping backups", - namespace: "ns-1", - cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), - k8sBackups: []*velerov1api.Backup{ - builder.ForBackup("ns-1", "backup-1"). - ObjectMeta( - builder.WithLabels(velerov1api.StorageLocationLabel, "the-really-long-location-name-that-is-much-more-than-63-c69e779"), - ). - Phase(velerov1api.BackupPhaseCompleted). - Result(), - builder.ForBackup("ns-1", "backup-2"). - ObjectMeta( - builder.WithLabels(velerov1api.StorageLocationLabel, "the-really-long-location-name-that-is-much-more-than-63-c69e779"), - ). - Phase(velerov1api.BackupPhaseCompleted). - Result(), - builder.ForBackup("ns-1", "backup-C"). - ObjectMeta( - builder.WithLabels(velerov1api.StorageLocationLabel, "the-really-long-location-name-that-is-much-more-than-63-c69e779"), - ). - Phase(velerov1api.BackupPhaseCompleted). - Result(), - }, - expectedDeletes: sets.NewString("backup-C"), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - client = fake.NewSimpleClientset() - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - sharedInformers = informers.NewSharedInformerFactory(client, 0) - ) - - c := NewBackupSyncController( - client.VeleroV1(), - fakeClient, - client.VeleroV1(), - sharedInformers.Velero().V1().Backups().Lister(), - nil, // csiVSLister - time.Duration(0), - test.namespace, - nil, // csiSnapshotClient - nil, // kubeClient - "", - nil, // new plugin manager func - nil, // backupStoreGetter - velerotest.NewLogger(), - ).(*backupSyncController) - - expectedDeleteActions := make([]core.Action, 0) - - for _, backup := range test.k8sBackups { - // add test backup to informer - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(backup), "Error adding backup to informer") - - // add test backup to client - _, err := client.VeleroV1().Backups(test.namespace).Create(context.TODO(), backup, metav1.CreateOptions{}) - require.NoError(t, err, "Error adding backup to clientset") - - // if we expect this backup to be deleted, set up the expected DeleteAction - if test.expectedDeletes.Has(backup.Name) { - actionDelete := core.NewDeleteAction( - velerov1api.SchemeGroupVersion.WithResource("backups"), - test.namespace, - backup.Name, - ) - expectedDeleteActions = append(expectedDeleteActions, actionDelete) - } + bslName := "default" + if test.useLongBSLName { + bslName = longLabelName } + r.deleteOrphanedBackups(ctx, bslName, test.cloudBackups, velerotest.NewLogger()) - c.deleteOrphanedBackups(longLabelName, test.cloudBackups, velerotest.NewLogger()) + numBackups, err := numBackups(client, r.namespace) + Expect(err).ShouldNot(HaveOccurred()) - numBackups, err := numBackups(t, client, c.namespace) - assert.NoError(t, err) + fmt.Println("") expected := len(test.k8sBackups) - len(test.expectedDeletes) - assert.Equal(t, expected, numBackups) - - velerotest.CompareActions(t, expectedDeleteActions, getDeleteActions(client.Actions())) - }) - } -} - -func getDeleteActions(actions []core.Action) []core.Action { - var deleteActions []core.Action - for _, action := range actions { - if action.GetVerb() == "delete" { - deleteActions = append(deleteActions, action) + Expect(expected).To(BeEquivalentTo(numBackups)) } - } - return deleteActions -} - -func numBackups(t *testing.T, c *fake.Clientset, ns string) (int, error) { - t.Helper() - existingK8SBackups, err := c.VeleroV1().Backups(ns).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return 0, err - } - - return len(existingK8SBackups.Items), nil -} - -func numPodVolumeBackups(t *testing.T, c *fake.Clientset, ns string) (int, error) { - t.Helper() - existingK8SPodvolumeBackups, err := c.VeleroV1().PodVolumeBackups(ns).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return 0, err - } - - return len(existingK8SPodvolumeBackups.Items), nil -} + }) +}) From 66ce7403510d72dcc28616a9cc1c0c5f1389b473 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Thu, 25 Aug 2022 14:10:18 +0800 Subject: [PATCH 2/2] Add Option for PeriodicalEnqueueSource. Signed-off-by: Xun Jiang --- pkg/controller/backup_deletion_controller.go | 2 +- .../backup_storage_location_controller.go | 11 +- pkg/controller/backup_sync_controller.go | 393 +++++++++-------- pkg/controller/backup_sync_controller_test.go | 414 ++++++++---------- pkg/controller/gc_controller.go | 2 +- .../restic_repository_controller.go | 2 +- pkg/controller/schedule_controller.go | 2 +- pkg/util/kube/periodical_enqueue_source.go | 35 +- .../kube/periodical_enqueue_source_test.go | 82 +++- 9 files changed, 504 insertions(+), 439 deletions(-) diff --git a/pkg/controller/backup_deletion_controller.go b/pkg/controller/backup_deletion_controller.go index a616dcb711..8d988d4499 100644 --- a/pkg/controller/backup_deletion_controller.go +++ b/pkg/controller/backup_deletion_controller.go @@ -91,7 +91,7 @@ func NewBackupDeletionReconciler( func (r *backupDeletionReconciler) SetupWithManager(mgr ctrl.Manager) error { // Make sure the expired requests can be deleted eventually - s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour) + s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour, kube.PeriodicalEnqueueSourceOption{}) return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.DeleteBackupRequest{}). Watches(s, nil). diff --git a/pkg/controller/backup_storage_location_controller.go b/pkg/controller/backup_storage_location_controller.go index 036e8dc8e7..3791e01e68 100644 --- a/pkg/controller/backup_storage_location_controller.go +++ b/pkg/controller/backup_storage_location_controller.go @@ -207,10 +207,13 @@ func (r *backupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) err mgr.GetClient(), &velerov1api.BackupStorageLocationList{}, bslValidationEnqueuePeriod, - // Add filter function to enqueue BSL per ValidationFrequency setting. - func(object client.Object) bool { - location := object.(*velerov1api.BackupStorageLocation) - return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation)) + kube.PeriodicalEnqueueSourceOption{ + FilterFuncs: []func(object client.Object) bool{ + func(object client.Object) bool { + location := object.(*velerov1api.BackupStorageLocation) + return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation)) + }, + }, }, ) return ctrl.NewControllerManagedBy(mgr). diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index f900c1aab2..cc79bbfe81 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -20,17 +20,20 @@ import ( "context" "time" + "github.com/apex/log" snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" kuberrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "github.com/vmware-tanzu/velero/pkg/util/kube" - "github.com/vmware-tanzu/velero/internal/storage" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/features" "github.com/vmware-tanzu/velero/pkg/label" @@ -39,7 +42,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -78,230 +80,198 @@ func NewBackupSyncReconciler( // Reconcile syncs between the backups in cluster and backups metadata in object store. func (b *backupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := b.logger.WithField("controller", BackupSync) - log.Debug("Checking for existing backup storage locations to sync into cluster.") + log = log.WithField("backupLocation", req.String()) + log.Debug("Begin to sync between backups' metadata in BSL object storage and cluster's existing backups.") - locationList, err := storage.ListBackupStorageLocations(ctx, b.client, b.namespace) + location := &velerov1api.BackupStorageLocation{} + err := b.client.Get(ctx, req.NamespacedName, location) if err != nil { - log.WithError(err).Error("No backup storage locations found, at least one is required") - return ctrl.Result{Requeue: false}, err - } - - // sync the default backup storage location first, if it exists - defaultBackupLocationName := "" - for _, location := range locationList.Items { - if location.Spec.Default { - defaultBackupLocationName = location.Name - break + if apierrors.IsNotFound(err) { + log.Debug("BackupStorageLocation is not found") + return ctrl.Result{}, nil } + return ctrl.Result{}, errors.Wrapf(err, "error getting BackupStorageLocation %s", req.String()) } - locations := orderedBackupLocations(&locationList, defaultBackupLocationName) pluginManager := b.newPluginManager(log) defer pluginManager.CleanupClients() - for _, location := range locations { - log := log.WithField("backupLocation", location.Name) + log.Debug("Checking backup location for backups to sync into cluster") - syncPeriod := b.defaultBackupSyncPeriod - if location.Spec.BackupSyncPeriod != nil { - syncPeriod = location.Spec.BackupSyncPeriod.Duration - if syncPeriod == 0 { - log.Debug("Backup sync period for this location is set to 0, skipping sync") - continue - } + backupStore, err := b.backupStoreGetter.Get(location, pluginManager, log) + if err != nil { + log.WithError(err).Error("Error getting backup store for this location") + return ctrl.Result{}, nil + } - if syncPeriod < 0 { - log.Debug("Backup sync period must be non-negative") - syncPeriod = b.defaultBackupSyncPeriod - } - } + // get a list of all the backups that are stored in the backup storage location + res, err := backupStore.ListBackups() + if err != nil { + log.WithError(err).Error("Error listing backups in backup store") + return ctrl.Result{}, nil + } + backupStoreBackups := sets.NewString(res...) + log.WithField("backupCount", len(backupStoreBackups)).Debug("Got backups from backup store") - lastSync := location.Status.LastSyncedTime - if lastSync != nil { - log.Debug("Checking if backups need to be synced at this time for this location") - nextSync := lastSync.Add(syncPeriod) - if time.Now().UTC().Before(nextSync) { - continue - } - } + // get a list of all the backups that exist as custom resources in the cluster + var clusterBackupList velerov1api.BackupList + listOption := client.ListOptions{ + LabelSelector: labels.Everything(), + Namespace: b.namespace, + } - log.Debug("Checking backup location for backups to sync into cluster") + err = b.client.List(ctx, &clusterBackupList, &listOption) + if err != nil { + log.WithError(errors.WithStack(err)).Error("Error getting backups from cluster, proceeding with sync into cluster") + } else { + log.WithField("backupCount", len(clusterBackupList.Items)).Debug("Got backups from cluster") + } - backupStore, err := b.backupStoreGetter.Get(&location, pluginManager, log) - if err != nil { - log.WithError(err).Error("Error getting backup store for this location") - continue - } + // get a list of backups that *are* in the backup storage location and *aren't* in the cluster + clusterBackupsSet := sets.NewString() + for _, b := range clusterBackupList.Items { + clusterBackupsSet.Insert(b.Name) + } + backupsToSync := backupStoreBackups.Difference(clusterBackupsSet) + + if count := backupsToSync.Len(); count > 0 { + log.Infof("Found %v backups in the backup location that do not exist in the cluster and need to be synced", count) + } else { + log.Debug("No backups found in the backup location that need to be synced into the cluster") + } + + // sync each backup + for backupName := range backupsToSync { + log = log.WithField("backup", backupName) + log.Info("Attempting to sync backup into cluster") - // get a list of all the backups that are stored in the backup storage location - res, err := backupStore.ListBackups() + backup, err := backupStore.GetBackupMetadata(backupName) if err != nil { - log.WithError(err).Error("Error listing backups in backup store") + log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store") continue } - backupStoreBackups := sets.NewString(res...) - log.WithField("backupCount", len(backupStoreBackups)).Debug("Got backups from backup store") - - // get a list of all the backups that exist as custom resources in the cluster - var clusterBackupList velerov1api.BackupList - listOption := client.ListOptions{ - LabelSelector: labels.Everything(), - Namespace: b.namespace, - } - err = b.client.List(ctx, &clusterBackupList, &listOption) - if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting backups from cluster, proceeding with sync into cluster") - } else { - log.WithField("backupCount", len(clusterBackupList.Items)).Debug("Got backups from cluster") + backup.Namespace = b.namespace + backup.ResourceVersion = "" + + // update the StorageLocation field and label since the name of the location + // may be different in this cluster than in the cluster that created the + // backup. + backup.Spec.StorageLocation = location.Name + if backup.Labels == nil { + backup.Labels = make(map[string]string) } + backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation) - // get a list of backups that *are* in the backup storage location and *aren't* in the cluster - clusterBackupsSet := sets.NewString() - for _, b := range clusterBackupList.Items { - clusterBackupsSet.Insert(b.Name) + // attempt to create backup custom resource via API + err = b.client.Create(ctx, backup, &client.CreateOptions{}) + switch { + case err != nil && kuberrs.IsAlreadyExists(err): + log.Debug("Backup already exists in cluster") + continue + case err != nil && !kuberrs.IsAlreadyExists(err): + log.WithError(errors.WithStack(err)).Error("Error syncing backup into cluster") + continue + default: + log.Info("Successfully synced backup into cluster") } - backupsToSync := backupStoreBackups.Difference(clusterBackupsSet) - if count := backupsToSync.Len(); count > 0 { - log.Infof("Found %v backups in the backup location that do not exist in the cluster and need to be synced", count) - } else { - log.Debug("No backups found in the backup location that need to be synced into the cluster") + // process the pod volume backups from object store, if any + podVolumeBackups, err := backupStore.GetPodVolumeBackups(backupName) + if err != nil { + log.WithError(errors.WithStack(err)).Error("Error getting pod volume backups for this backup from backup store") + continue } - // sync each backup - for backupName := range backupsToSync { - log = log.WithField("backup", backupName) - log.Info("Attempting to sync backup into cluster") + for _, podVolumeBackup := range podVolumeBackups { + log := log.WithField("podVolumeBackup", podVolumeBackup.Name) + log.Debug("Checking this pod volume backup to see if it needs to be synced into the cluster") - backup, err := backupStore.GetBackupMetadata(backupName) - if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store") - continue + for i, ownerRef := range podVolumeBackup.OwnerReferences { + if ownerRef.APIVersion == velerov1api.SchemeGroupVersion.String() && ownerRef.Kind == "Backup" && ownerRef.Name == backup.Name { + log.WithField("uid", backup.UID).Debugf("Updating pod volume backup's owner reference UID") + podVolumeBackup.OwnerReferences[i].UID = backup.UID + } } - backup.Namespace = b.namespace - backup.ResourceVersion = "" - - // update the StorageLocation field and label since the name of the location - // may be different in this cluster than in the cluster that created the - // backup. - backup.Spec.StorageLocation = location.Name - if backup.Labels == nil { - backup.Labels = make(map[string]string) + if _, ok := podVolumeBackup.Labels[velerov1api.BackupUIDLabel]; ok { + podVolumeBackup.Labels[velerov1api.BackupUIDLabel] = string(backup.UID) } - backup.Labels[velerov1api.StorageLocationLabel] = label.GetValidName(backup.Spec.StorageLocation) - // attempt to create backup custom resource via API - err = b.client.Create(ctx, backup, &client.CreateOptions{}) + podVolumeBackup.Namespace = backup.Namespace + podVolumeBackup.ResourceVersion = "" + + err = b.client.Create(ctx, podVolumeBackup, &client.CreateOptions{}) switch { case err != nil && kuberrs.IsAlreadyExists(err): - log.Debug("Backup already exists in cluster") + log.Debug("Pod volume backup already exists in cluster") continue case err != nil && !kuberrs.IsAlreadyExists(err): - log.WithError(errors.WithStack(err)).Error("Error syncing backup into cluster") + log.WithError(errors.WithStack(err)).Error("Error syncing pod volume backup into cluster") continue default: - log.Info("Successfully synced backup into cluster") + log.Debug("Synced pod volume backup into cluster") } + } - // process the pod volume backups from object store, if any - podVolumeBackups, err := backupStore.GetPodVolumeBackups(backupName) + if features.IsEnabled(velerov1api.CSIFeatureFlag) { + // we are syncing these objects only to ensure that the storage snapshots are cleaned up + // on backup deletion or expiry. + log.Info("Syncing CSI VolumeSnapshotClasses in backup") + vsClasses, err := backupStore.GetCSIVolumeSnapshotClasses(backupName) if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting pod volume backups for this backup from backup store") + log.WithError(errors.WithStack(err)).Error("Error getting CSI VolumeSnapClasses for this backup from backup store") continue } - - for _, podVolumeBackup := range podVolumeBackups { - log := log.WithField("podVolumeBackup", podVolumeBackup.Name) - log.Debug("Checking this pod volume backup to see if it needs to be synced into the cluster") - - for i, ownerRef := range podVolumeBackup.OwnerReferences { - if ownerRef.APIVersion == velerov1api.SchemeGroupVersion.String() && ownerRef.Kind == "Backup" && ownerRef.Name == backup.Name { - log.WithField("uid", backup.UID).Debugf("Updating pod volume backup's owner reference UID") - podVolumeBackup.OwnerReferences[i].UID = backup.UID - } - } - - if _, ok := podVolumeBackup.Labels[velerov1api.BackupUIDLabel]; ok { - podVolumeBackup.Labels[velerov1api.BackupUIDLabel] = string(backup.UID) - } - - podVolumeBackup.Namespace = backup.Namespace - podVolumeBackup.ResourceVersion = "" - - err = b.client.Create(ctx, podVolumeBackup, &client.CreateOptions{}) + for _, vsClass := range vsClasses { + vsClass.ResourceVersion = "" + err := b.client.Create(ctx, vsClass, &client.CreateOptions{}) switch { case err != nil && kuberrs.IsAlreadyExists(err): - log.Debug("Pod volume backup already exists in cluster") + log.Debugf("VolumeSnapshotClass %s already exists in cluster", vsClass.Name) continue case err != nil && !kuberrs.IsAlreadyExists(err): - log.WithError(errors.WithStack(err)).Error("Error syncing pod volume backup into cluster") + log.WithError(errors.WithStack(err)).Errorf("Error syncing VolumeSnapshotClass %s into cluster", vsClass.Name) continue default: - log.Debug("Synced pod volume backup into cluster") + log.Infof("Created CSI VolumeSnapshotClass %s", vsClass.Name) } } - if features.IsEnabled(velerov1api.CSIFeatureFlag) { - // we are syncing these objects only to ensure that the storage snapshots are cleaned up - // on backup deletion or expiry. - log.Info("Syncing CSI VolumeSnapshotClasses in backup") - vsClasses, err := backupStore.GetCSIVolumeSnapshotClasses(backupName) - if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting CSI VolumeSnapClasses for this backup from backup store") - continue - } - for _, vsClass := range vsClasses { - vsClass.ResourceVersion = "" - err := b.client.Create(ctx, vsClass, &client.CreateOptions{}) - switch { - case err != nil && kuberrs.IsAlreadyExists(err): - log.Debugf("VolumeSnapshotClass %s already exists in cluster", vsClass.Name) - continue - case err != nil && !kuberrs.IsAlreadyExists(err): - log.WithError(errors.WithStack(err)).Errorf("Error syncing VolumeSnapshotClass %s into cluster", vsClass.Name) - continue - default: - log.Infof("Created CSI VolumeSnapshotClass %s", vsClass.Name) - } - } + log.Info("Syncing CSI volumesnapshotcontents in backup") + snapConts, err := backupStore.GetCSIVolumeSnapshotContents(backupName) + if err != nil { + log.WithError(errors.WithStack(err)).Error("Error getting CSI volumesnapshotcontents for this backup from backup store") + continue + } - log.Info("Syncing CSI volumesnapshotcontents in backup") - snapConts, err := backupStore.GetCSIVolumeSnapshotContents(backupName) - if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting CSI volumesnapshotcontents for this backup from backup store") + log.Infof("Syncing %d CSI volumesnapshotcontents in backup", len(snapConts)) + for _, snapCont := range snapConts { + // TODO: Reset ResourceVersion prior to persisting VolumeSnapshotContents + snapCont.ResourceVersion = "" + err := b.client.Create(ctx, snapCont, &client.CreateOptions{}) + switch { + case err != nil && kuberrs.IsAlreadyExists(err): + log.Debugf("volumesnapshotcontent %s already exists in cluster", snapCont.Name) continue - } - - log.Infof("Syncing %d CSI volumesnapshotcontents in backup", len(snapConts)) - for _, snapCont := range snapConts { - // TODO: Reset ResourceVersion prior to persisting VolumeSnapshotContents - snapCont.ResourceVersion = "" - err := b.client.Create(ctx, snapCont, &client.CreateOptions{}) - switch { - case err != nil && kuberrs.IsAlreadyExists(err): - log.Debugf("volumesnapshotcontent %s already exists in cluster", snapCont.Name) - continue - case err != nil && !kuberrs.IsAlreadyExists(err): - log.WithError(errors.WithStack(err)).Errorf("Error syncing volumesnapshotcontent %s into cluster", snapCont.Name) - continue - default: - log.Infof("Created CSI volumesnapshotcontent %s", snapCont.Name) - } + case err != nil && !kuberrs.IsAlreadyExists(err): + log.WithError(errors.WithStack(err)).Errorf("Error syncing volumesnapshotcontent %s into cluster", snapCont.Name) + continue + default: + log.Infof("Created CSI volumesnapshotcontent %s", snapCont.Name) } } } + } - b.deleteOrphanedBackups(ctx, location.Name, backupStoreBackups, log) + b.deleteOrphanedBackups(ctx, location.Name, backupStoreBackups, log) - // update the location's last-synced time field - statusPatch := client.MergeFrom(location.DeepCopy()) - location.Status.LastSyncedTime = &metav1.Time{Time: time.Now().UTC()} - if err := b.client.Patch(ctx, &location, statusPatch); err != nil { - log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time") - continue - } + // update the location's last-synced time field + statusPatch := client.MergeFrom(location.DeepCopy()) + location.Status.LastSyncedTime = &metav1.Time{Time: time.Now().UTC()} + if err := b.client.Patch(ctx, location, statusPatch); err != nil { + log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time") + return ctrl.Result{}, nil } return ctrl.Result{}, nil @@ -314,17 +284,14 @@ func (b *backupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { mgr.GetClient(), &velerov1api.BackupStorageLocationList{}, backupSyncReconcilePeriod, - // Only enqueue the first BSL - func(object client.Object) bool { - var bslList velerov1api.BackupStorageLocationList - b.client.List(context.Background(), &bslList, &client.ListOptions{ - Namespace: b.namespace, - }) - if bslList.Items[0].Namespace == object.GetNamespace() && - bslList.Items[0].Name == object.GetName() { - return true - } - return false + kube.PeriodicalEnqueueSourceOption{ + OrderFunc: backupSyncSourceOrderFunc, + FilterFuncs: []func(object client.Object) bool{ + func(object client.Object) bool { + location := object.(*velerov1api.BackupStorageLocation) + return b.locationFilterFunc(location) + }, + }, }, ) @@ -346,9 +313,6 @@ func (b *backupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { }, }). Watches(backupSyncSource, nil). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). Complete(b) } @@ -413,23 +377,64 @@ func (b *backupSyncReconciler) deleteCSISnapshotsByBackup(ctx context.Context, b } } -// orderedBackupLocations returns a new slice with the default backup location first (if it exists), +// backupSyncSourceOrderFunc returns a new slice with the default backup location first (if it exists), // followed by the rest of the locations in no particular order. -func orderedBackupLocations(locationList *velerov1api.BackupStorageLocationList, defaultLocationName string) []velerov1api.BackupStorageLocation { - var result []velerov1api.BackupStorageLocation +func backupSyncSourceOrderFunc(objList client.ObjectList) client.ObjectList { + inputBSLList := objList.(*velerov1api.BackupStorageLocationList) + resultBSLList := &velerov1api.BackupStorageLocationList{} + bslArray := make([]runtime.Object, 0) + + if len(inputBSLList.Items) <= 0 { + return objList + } - for i := range locationList.Items { - if locationList.Items[i].Name == defaultLocationName { + for i := range inputBSLList.Items { + location := inputBSLList.Items[i] + + // sync the default backup storage location first, if it exists + if location.Spec.Default { // put the default location first - result = append(result, locationList.Items[i]) + bslArray = append(bslArray, &inputBSLList.Items[i]) // append everything before the default - result = append(result, locationList.Items[:i]...) + for _, bsl := range inputBSLList.Items[:i] { + bslArray = append(bslArray, &bsl) + } // append everything after the default - result = append(result, locationList.Items[i+1:]...) + for _, bsl := range inputBSLList.Items[i+1:] { + bslArray = append(bslArray, &bsl) + } + meta.SetList(resultBSLList, bslArray) - return result + return resultBSLList } } - return locationList.Items + // No default BSL found. Return the input. + return objList +} + +func (b *backupSyncReconciler) locationFilterFunc(location *velerov1api.BackupStorageLocation) bool { + syncPeriod := b.defaultBackupSyncPeriod + if location.Spec.BackupSyncPeriod != nil { + syncPeriod = location.Spec.BackupSyncPeriod.Duration + if syncPeriod == 0 { + log.Debug("Backup sync period for this location is set to 0, skipping sync") + return false + } + + if syncPeriod < 0 { + log.Debug("Backup sync period must be non-negative") + syncPeriod = b.defaultBackupSyncPeriod + } + } + + lastSync := location.Status.LastSyncedTime + if lastSync != nil { + log.Debug("Checking if backups need to be synced at this time for this location") + nextSync := lastSync.Add(syncPeriod) + if time.Now().UTC().Before(nextSync) { + return false + } + } + return true } diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index b20ec63967..4f1e280c55 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -25,7 +25,9 @@ import ( . "github.com/onsi/gomega" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" @@ -44,12 +46,30 @@ import ( velerotest "github.com/vmware-tanzu/velero/pkg/test" ) +func defaultLocation(namespace string) *velerov1api.BackupStorageLocation { + return &velerov1api.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "location-1", + }, + Spec: velerov1api.BackupStorageLocationSpec{ + Provider: "objStoreProvider", + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + Bucket: "bucket-1", + }, + }, + Default: true, + }, + } +} + func defaultLocationsList(namespace string) []*velerov1api.BackupStorageLocation { return []*velerov1api.BackupStorageLocation{ { ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, - Name: "location-1", + Name: "location-0", }, Spec: velerov1api.BackupStorageLocationSpec{ Provider: "objStoreProvider", @@ -58,32 +78,27 @@ func defaultLocationsList(namespace string) []*velerov1api.BackupStorageLocation Bucket: "bucket-1", }, }, - Default: true, }, }, { ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, - Name: "location-2", + Name: "location-1", }, Spec: velerov1api.BackupStorageLocationSpec{ Provider: "objStoreProvider", StorageType: velerov1api.StorageType{ ObjectStorage: &velerov1api.ObjectStorageLocation{ - Bucket: "bucket-2", + Bucket: "bucket-1", }, }, + Default: true, }, }, - } -} - -func defaultLocationsListWithLongerLocationName(namespace string) []*velerov1api.BackupStorageLocation { - return []*velerov1api.BackupStorageLocation{ { ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, - Name: "the-really-long-location-name-that-is-much-more-than-63-characters-1", + Name: "location-2", }, Spec: velerov1api.BackupStorageLocationSpec{ Provider: "objStoreProvider", @@ -97,13 +112,13 @@ func defaultLocationsListWithLongerLocationName(namespace string) []*velerov1api { ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, - Name: "the-really-long-location-name-that-is-much-more-than-63-characters-2", + Name: "location-3", }, Spec: velerov1api.BackupStorageLocationSpec{ Provider: "objStoreProvider", StorageType: velerov1api.StorageType{ ObjectStorage: &velerov1api.ObjectStorageLocation{ - Bucket: "bucket-2", + Bucket: "bucket-1", }, }, }, @@ -111,6 +126,23 @@ func defaultLocationsListWithLongerLocationName(namespace string) []*velerov1api } } +func defaultLocationWithLongerLocationName(namespace string) *velerov1api.BackupStorageLocation { + return &velerov1api.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "the-really-long-location-name-that-is-much-more-than-63-characters-1", + }, + Spec: velerov1api.BackupStorageLocationSpec{ + Provider: "objStoreProvider", + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + Bucket: "bucket-1", + }, + }, + }, + } +} + func numBackups(c ctrlClient.WithWatch, ns string) (int, error) { var existingK8SBackups velerov1api.BackupList err := c.List(context.TODO(), &existingK8SBackups, &ctrlClient.ListOptions{}) @@ -131,8 +163,8 @@ var _ = Describe("Backup Sync Reconciler", func() { tests := []struct { name string namespace string - locations []*velerov1api.BackupStorageLocation - cloudBuckets map[string][]*cloudBackupData + location *velerov1api.BackupStorageLocation + cloudBackups []*cloudBackupData existingBackups []*velerov1api.Backup existingPodVolumeBackups []*velerov1api.PodVolumeBackup longLocationNameEnabled bool @@ -140,71 +172,44 @@ var _ = Describe("Backup Sync Reconciler", func() { { name: "no cloud backups", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), + location: defaultLocation("ns-1"), }, { name: "normal case", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, + location: defaultLocation("ns-1"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").Result(), }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), }, }, }, { name: "all synced backups get created in Velero server's namespace", namespace: "velero", - locations: defaultLocationsList("velero"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, + location: defaultLocation("velero"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").Result(), }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-2", "backup-3").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("velero", "backup-4").Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), }, }, }, { name: "new backups get synced when some cloud backups already exist in the cluster", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, + location: defaultLocation("ns-1"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").Result(), }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-4").Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), }, }, existingBackups: []*velerov1api.Backup{ @@ -217,12 +222,10 @@ var _ = Describe("Backup Sync Reconciler", func() { { name: "existing backups without a StorageLocation get it filled in", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - }, + location: defaultLocation("ns-1"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").Result(), }, }, existingBackups: []*velerov1api.Backup{ @@ -234,74 +237,45 @@ var _ = Describe("Backup Sync Reconciler", func() { { name: "backup storage location names and labels get updated", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, + location: defaultLocation("ns-1"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").StorageLocation("bar").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "bar")).Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), }, }, }, { name: "backup storage location names and labels get updated with location name greater than 63 chars", namespace: "ns-1", - locations: defaultLocationsListWithLongerLocationName("ns-1"), + location: defaultLocationWithLongerLocationName("ns-1"), longLocationNameEnabled: true, - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - }, + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").StorageLocation("foo").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "foo")).Result(), }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").StorageLocation("bar").ObjectMeta(builder.WithLabels(velerov1api.StorageLocationLabel, "bar")).Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), }, }, }, { name: "all synced backups and pod volume backups get created in Velero server's namespace", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - }, - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), - }, + location: defaultLocation("ns-1"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), }, }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-4").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-2").Result(), }, }, }, @@ -309,33 +283,18 @@ var _ = Describe("Backup Sync Reconciler", func() { { name: "new pod volume backups get synched when some pod volume backups already exist in the cluster", namespace: "ns-1", - locations: defaultLocationsList("ns-1"), - cloudBuckets: map[string][]*cloudBackupData{ - "bucket-1": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-1").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - }, - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-2").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), - }, + location: defaultLocation("ns-1"), + cloudBackups: []*cloudBackupData{ + { + backup: builder.ForBackup("ns-1", "backup-1").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), }, }, - "bucket-2": { - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-3").Result(), - }, - &cloudBackupData{ - backup: builder.ForBackup("ns-1", "backup-4").Result(), - podVolumeBackups: []*velerov1api.PodVolumeBackup{ - builder.ForPodVolumeBackup("ns-1", "pvb-1").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-5").Result(), - builder.ForPodVolumeBackup("ns-1", "pvb-6").Result(), - }, + { + backup: builder.ForBackup("ns-1", "backup-2").Result(), + podVolumeBackups: []*velerov1api.PodVolumeBackup{ + builder.ForPodVolumeBackup("ns-1", "pvb-3").Result(), }, }, }, @@ -363,20 +322,18 @@ var _ = Describe("Backup Sync Reconciler", func() { logger: velerotest.NewLogger(), } - for _, location := range test.locations { - Expect(r.client.Create(ctx, location)).ShouldNot(HaveOccurred()) - backupStores[location.Name] = &persistencemocks.BackupStore{} - } + if test.location != nil { + Expect(r.client.Create(ctx, test.location)).ShouldNot(HaveOccurred()) + backupStores[test.location.Name] = &persistencemocks.BackupStore{} - for _, location := range test.locations { - backupStore, ok := backupStores[location.Name] - Expect(ok).To(BeTrue(), "no mock backup store for location %s", location.Name) + backupStore, ok := backupStores[test.location.Name] + Expect(ok).To(BeTrue(), "no mock backup store for location %s", test.location.Name) var backupNames []string - for _, bucket := range test.cloudBuckets[location.Spec.ObjectStorage.Bucket] { - backupNames = append(backupNames, bucket.backup.Name) - backupStore.On("GetBackupMetadata", bucket.backup.Name).Return(bucket.backup, nil) - backupStore.On("GetPodVolumeBackups", bucket.backup.Name).Return(bucket.podVolumeBackups, nil) + for _, backup := range test.cloudBackups { + backupNames = append(backupNames, backup.backup.Name) + backupStore.On("GetBackupMetadata", backup.backup.Name).Return(backup.backup, nil) + backupStore.On("GetPodVolumeBackups", backup.backup.Name).Return(backup.podVolumeBackups, nil) } backupStore.On("ListBackups").Return(backupNames, nil) } @@ -392,92 +349,79 @@ var _ = Describe("Backup Sync Reconciler", func() { } actualResult, err := r.Reconcile(ctx, ctrl.Request{ - NamespacedName: types.NamespacedName{Namespace: "ns-1"}, + NamespacedName: types.NamespacedName{Namespace: test.location.Namespace, Name: test.location.Name}, }) Expect(actualResult).To(BeEquivalentTo(ctrl.Result{})) Expect(err).To(BeNil()) - for bucket, backupDataSet := range test.cloudBuckets { - // figure out which location this bucket is for; we need this for verification - // purposes later - var location *velerov1api.BackupStorageLocation - for _, loc := range test.locations { - if loc.Spec.ObjectStorage.Bucket == bucket { - location = loc + // process the cloud backups + for _, cloudBackupData := range test.cloudBackups { + obj := &velerov1api.Backup{} + err := client.Get( + context.TODO(), + types.NamespacedName{ + Namespace: cloudBackupData.backup.Namespace, + Name: cloudBackupData.backup.Name}, + obj) + Expect(err).To(BeNil()) + + // did this cloud backup already exist in the cluster? + var existing *velerov1api.Backup + for _, obj := range test.existingBackups { + if obj.Name == cloudBackupData.backup.Name { + existing = obj break } } - Expect(location).NotTo(BeNil()) - // process the cloud backups - for _, cloudBackupData := range backupDataSet { - obj := &velerov1api.Backup{} + if existing != nil { + // if this cloud backup already exists in the cluster, make sure that what we get from the + // client is the existing backup, not the cloud one. + + // verify that the in-cluster backup has its storage location populated, if it's not already. + expected := existing.DeepCopy() + expected.Spec.StorageLocation = test.location.Name + + Expect(expected).To(BeEquivalentTo(obj)) + } else { + // verify that the storage location field and label are set properly + Expect(test.location.Name).To(BeEquivalentTo(obj.Spec.StorageLocation)) + + locationName := test.location.Name + if test.longLocationNameEnabled { + locationName = label.GetValidName(locationName) + } + Expect(locationName).To(BeEquivalentTo(obj.Labels[velerov1api.StorageLocationLabel])) + Expect(len(obj.Labels[velerov1api.StorageLocationLabel]) <= validation.DNS1035LabelMaxLength).To(BeTrue()) + } + + // process the cloud pod volume backups for this backup, if any + for _, podVolumeBackup := range cloudBackupData.podVolumeBackups { + objPodVolumeBackup := &velerov1api.PodVolumeBackup{} err := client.Get( context.TODO(), types.NamespacedName{ - Namespace: cloudBackupData.backup.Namespace, - Name: cloudBackupData.backup.Name}, - obj) - Expect(err).To(BeNil()) - - // did this cloud backup already exist in the cluster? - var existing *velerov1api.Backup - for _, obj := range test.existingBackups { - if obj.Name == cloudBackupData.backup.Name { - existing = obj + Namespace: podVolumeBackup.Namespace, + Name: podVolumeBackup.Name, + }, + objPodVolumeBackup) + Expect(err).ShouldNot(HaveOccurred()) + + // did this cloud pod volume backup already exist in the cluster? + var existingPodVolumeBackup *velerov1api.PodVolumeBackup + for _, objPodVolumeBackup := range test.existingPodVolumeBackups { + if objPodVolumeBackup.Name == podVolumeBackup.Name { + existingPodVolumeBackup = objPodVolumeBackup break } } - if existing != nil { - // if this cloud backup already exists in the cluster, make sure that what we get from the + if existingPodVolumeBackup != nil { + // if this cloud pod volume backup already exists in the cluster, make sure that what we get from the // client is the existing backup, not the cloud one. - - // verify that the in-cluster backup has its storage location populated, if it's not already. - expected := existing.DeepCopy() - expected.Spec.StorageLocation = location.Name - - Expect(expected).To(BeEquivalentTo(obj)) - } else { - // verify that the storage location field and label are set properly - Expect(location.Name).To(BeEquivalentTo(obj.Spec.StorageLocation)) - - locationName := location.Name - if test.longLocationNameEnabled { - locationName = label.GetValidName(locationName) - } - Expect(locationName).To(BeEquivalentTo(obj.Labels[velerov1api.StorageLocationLabel])) - Expect(len(obj.Labels[velerov1api.StorageLocationLabel]) <= validation.DNS1035LabelMaxLength).To(BeTrue()) - } - - // process the cloud pod volume backups for this backup, if any - for _, podVolumeBackup := range cloudBackupData.podVolumeBackups { - objPodVolumeBackup := &velerov1api.PodVolumeBackup{} - err := client.Get( - context.TODO(), - types.NamespacedName{ - Namespace: podVolumeBackup.Namespace, - Name: podVolumeBackup.Name, - }, - objPodVolumeBackup) - Expect(err).ShouldNot(HaveOccurred()) - - // did this cloud pod volume backup already exist in the cluster? - var existingPodVolumeBackup *velerov1api.PodVolumeBackup - for _, objPodVolumeBackup := range test.existingPodVolumeBackups { - if objPodVolumeBackup.Name == podVolumeBackup.Name { - existingPodVolumeBackup = objPodVolumeBackup - break - } - } - - if existingPodVolumeBackup != nil { - // if this cloud pod volume backup already exists in the cluster, make sure that what we get from the - // client is the existing backup, not the cloud one. - expected := existingPodVolumeBackup.DeepCopy() - Expect(expected).To(BeEquivalentTo(objPodVolumeBackup)) - } + expected := existingPodVolumeBackup.DeepCopy() + Expect(expected).To(BeEquivalentTo(objPodVolumeBackup)) } } } @@ -650,4 +594,30 @@ var _ = Describe("Backup Sync Reconciler", func() { Expect(expected).To(BeEquivalentTo(numBackups)) } }) + + It("Test moving default BSL at the head of BSL array.", func() { + locationList := &velerov1api.BackupStorageLocationList{} + objArray := make([]runtime.Object, 0) + + // Generate BSL array. + locations := defaultLocationsList("velero") + for _, bsl := range locations { + objArray = append(objArray, bsl) + } + + meta.SetList(locationList, objArray) + + testObjList := backupSyncSourceOrderFunc(locationList) + testObjArray, err := meta.ExtractList(testObjList) + Expect(err).ShouldNot(HaveOccurred()) + + expectLocation := testObjArray[0].(*velerov1api.BackupStorageLocation) + Expect(expectLocation.Spec.Default).To(BeEquivalentTo(true)) + + // If BSL list without default BSL is passed in, the output should be same with input. + locationList.Items = testObjList.(*velerov1api.BackupStorageLocationList).Items[1:] + testObjList = backupSyncSourceOrderFunc(locationList) + Expect(testObjList).To(BeEquivalentTo(locationList)) + + }) }) diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index c12ba45d9c..a3a2bd5f3b 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -66,7 +66,7 @@ func NewGCReconciler( // Other Events will be filtered to decrease the number of reconcile call. Especially UpdateEvent must be filtered since we removed // the backup status as the sub-resource of backup in v1.9, every change on it will be treated as UpdateEvent and trigger reconcile call. func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error { - s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, defaultGCFrequency) + s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, defaultGCFrequency, kube.PeriodicalEnqueueSourceOption{}) return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.Backup{}). WithEventFilter(predicate.Funcs{ diff --git a/pkg/controller/restic_repository_controller.go b/pkg/controller/restic_repository_controller.go index d6cd869e3e..55b7fb6967 100644 --- a/pkg/controller/restic_repository_controller.go +++ b/pkg/controller/restic_repository_controller.go @@ -69,7 +69,7 @@ func NewResticRepoReconciler(namespace string, logger logrus.FieldLogger, client } func (r *ResticRepoReconciler) SetupWithManager(mgr ctrl.Manager) error { - s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod) + s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod, kube.PeriodicalEnqueueSourceOption{}) return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.BackupRepository{}). Watches(s, nil). diff --git a/pkg/controller/schedule_controller.go b/pkg/controller/schedule_controller.go index e62969b27d..9ca9de1edb 100644 --- a/pkg/controller/schedule_controller.go +++ b/pkg/controller/schedule_controller.go @@ -65,7 +65,7 @@ func NewScheduleReconciler( } func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error { - s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod) + s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod, kube.PeriodicalEnqueueSourceOption{}) return ctrl.NewControllerManagedBy(mgr). For(&velerov1.Schedule{}). Watches(s, nil). diff --git a/pkg/util/kube/periodical_enqueue_source.go b/pkg/util/kube/periodical_enqueue_source.go index 20b658c61a..16c4db0ce0 100644 --- a/pkg/util/kube/periodical_enqueue_source.go +++ b/pkg/util/kube/periodical_enqueue_source.go @@ -34,13 +34,18 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) -func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration, filters ...func(object client.Object) bool) *PeriodicalEnqueueSource { +func NewPeriodicalEnqueueSource( + logger logrus.FieldLogger, + client client.Client, + objList client.ObjectList, + period time.Duration, + option PeriodicalEnqueueSourceOption) *PeriodicalEnqueueSource { return &PeriodicalEnqueueSource{ - logger: logger.WithField("resource", reflect.TypeOf(objList).String()), - Client: client, - objList: objList, - period: period, - filterFuncs: filters, + logger: logger.WithField("resource", reflect.TypeOf(objList).String()), + Client: client, + objList: objList, + period: period, + option: option, } } @@ -49,10 +54,15 @@ func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, // the reconcile logic periodically type PeriodicalEnqueueSource struct { client.Client - logger logrus.FieldLogger - objList client.ObjectList - period time.Duration - filterFuncs []func(object client.Object) bool + logger logrus.FieldLogger + objList client.ObjectList + period time.Duration + option PeriodicalEnqueueSourceOption +} + +type PeriodicalEnqueueSourceOption struct { + FilterFuncs []func(object client.Object) bool + OrderFunc func(objList client.ObjectList) client.ObjectList } func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, pre ...predicate.Predicate) error { @@ -66,13 +76,16 @@ func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHand p.logger.Debug("no resources, skip") return } + if p.option.OrderFunc != nil { + p.objList = p.option.OrderFunc(p.objList) + } if err := meta.EachListItem(p.objList, func(object runtime.Object) error { obj, ok := object.(metav1.Object) if !ok { p.logger.Error("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String()) return nil } - for _, filter := range p.filterFuncs { + for _, filter := range p.option.FilterFuncs { if filter != nil { if enqueueObj := filter(object.(client.Object)); !enqueueObj { p.logger.Debugf("skip enqueue object %s/%s due to filter function.", obj.GetNamespace(), obj.GetName()) diff --git a/pkg/util/kube/periodical_enqueue_source_test.go b/pkg/util/kube/periodical_enqueue_source_test.go index 3621533477..e209d6d9eb 100644 --- a/pkg/util/kube/periodical_enqueue_source_test.go +++ b/pkg/util/kube/periodical_enqueue_source_test.go @@ -23,11 +23,14 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" crclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/vmware-tanzu/velero/internal/storage" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -39,7 +42,7 @@ func TestStart(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.TODO()) client := (&fake.ClientBuilder{}).Build() queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()) - source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx), client, &velerov1.ScheduleList{}, 1*time.Second) + source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx), client, &velerov1.ScheduleList{}, 1*time.Second, PeriodicalEnqueueSourceOption{}) require.Nil(t, source.Start(ctx, nil, queue)) @@ -76,9 +79,11 @@ func TestFilter(t *testing.T) { client, &velerov1.BackupStorageLocationList{}, 1*time.Second, - func(object crclient.Object) bool { - location := object.(*velerov1.BackupStorageLocation) - return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name)) + PeriodicalEnqueueSourceOption{ + FilterFuncs: []func(object crclient.Object) bool{func(object crclient.Object) bool { + location := object.(*velerov1.BackupStorageLocation) + return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name)) + }}, }, ) @@ -104,3 +109,72 @@ func TestFilter(t *testing.T) { cancelFunc() } + +func TestOrder(t *testing.T) { + require.Nil(t, velerov1.AddToScheme(scheme.Scheme)) + + ctx, cancelFunc := context.WithCancel(context.TODO()) + client := (&fake.ClientBuilder{}).Build() + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()) + source := NewPeriodicalEnqueueSource( + logrus.WithContext(ctx), + client, + &velerov1.BackupStorageLocationList{}, + 1*time.Second, + PeriodicalEnqueueSourceOption{ + OrderFunc: func(objList crclient.ObjectList) crclient.ObjectList { + locationList := &velerov1.BackupStorageLocationList{} + objArray := make([]runtime.Object, 0) + + // Generate BSL array. + locations, _ := meta.ExtractList(objList) + // Move default BSL to tail of array. + objArray = append(objArray, locations[1]) + objArray = append(objArray, locations[0]) + + meta.SetList(locationList, objArray) + + return locationList + }, + }, + ) + + require.Nil(t, source.Start(ctx, nil, queue)) + + // Should not patch a backup storage location object status phase + // if the location's validation frequency is specifically set to zero + require.Nil(t, client.Create(ctx, &velerov1.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "location1", + Namespace: "default", + }, + Spec: velerov1.BackupStorageLocationSpec{ + ValidationFrequency: &metav1.Duration{Duration: 0}, + }, + Status: velerov1.BackupStorageLocationStatus{ + LastValidationTime: &metav1.Time{Time: time.Now()}, + }, + })) + require.Nil(t, client.Create(ctx, &velerov1.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "location2", + Namespace: "default", + }, + Spec: velerov1.BackupStorageLocationSpec{ + ValidationFrequency: &metav1.Duration{Duration: 0}, + Default: true, + }, + Status: velerov1.BackupStorageLocationStatus{ + LastValidationTime: &metav1.Time{Time: time.Now()}, + }, + })) + time.Sleep(2 * time.Second) + + first, _ := queue.Get() + bsl := &velerov1.BackupStorageLocation{} + require.Equal(t, "location2", first.(reconcile.Request).Name) + require.Nil(t, client.Get(ctx, first.(reconcile.Request).NamespacedName, bsl)) + require.Equal(t, true, bsl.Spec.Default) + + cancelFunc() +}