Skip to content

Commit

Permalink
Merge pull request #5241 from blackpiglet/update_enabled_runtime_cont…
Browse files Browse the repository at this point in the history
…rollers

Controller refactor code modifications.
  • Loading branch information
qiuming-best authored Aug 30, 2022
2 parents 7164875 + eaf9fab commit c8818ec
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 105 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5241-jxun
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Controller refactor code modifications.
78 changes: 42 additions & 36 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,13 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
controller.Restore: restoreControllerRunInfo,
}
// Note: all runtime type controllers that can be disabled are grouped separately, below:
enabledRuntimeControllers := make(map[string]struct{})
enabledRuntimeControllers[controller.ServerStatusRequest] = struct{}{}
enabledRuntimeControllers[controller.DownloadRequest] = struct{}{}
enabledRuntimeControllers[controller.GarbageCollection] = struct{}{}
enabledRuntimeControllers := map[string]struct{}{
controller.ServerStatusRequest: {},
controller.DownloadRequest: {},
controller.Schedule: {},
controller.ResticRepo: {},
controller.BackupDeletion: {},
}

if s.config.restoreOnly {
s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers")
Expand Down Expand Up @@ -789,50 +792,53 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupStorageLocation)
}

if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
if _, ok := enabledRuntimeControllers[controller.Schedule]; ok {
if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
}
}

if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ResticRepo)
if _, ok := enabledRuntimeControllers[controller.ResticRepo]; ok {
if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ResticRepo)
}
}

if err := controller.NewBackupDeletionReconciler(
s.logger,
s.mgr.GetClient(),
backupTracker,
s.repoManager,
s.metrics,
s.discoveryHelper,
newPluginManager,
backupStoreGetter,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupDeletion)
if _, ok := enabledRuntimeControllers[controller.BackupDeletion]; ok {
if err := controller.NewBackupDeletionReconciler(
s.logger,
s.mgr.GetClient(),
backupTracker,
s.repoManager,
s.metrics,
s.discoveryHelper,
newPluginManager,
backupStoreGetter,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupDeletion)
}
}

if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok {
r := controller.ServerStatusRequestReconciler{
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Ctx: s.ctx,
PluginRegistry: s.pluginRegistry,
Clock: clock.RealClock{},
Log: s.logger,
}
if err := r.SetupWithManager(s.mgr); err != nil {
if err := controller.NewServerStatusRequestReconciler(
s.mgr.GetClient(),
s.ctx,
s.pluginRegistry,
clock.RealClock{},
s.logger,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ServerStatusRequest)
}
}

if _, ok := enabledRuntimeControllers[controller.DownloadRequest]; ok {
r := controller.DownloadRequestReconciler{
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Clock: clock.RealClock{},
NewPluginManager: newPluginManager,
BackupStoreGetter: backupStoreGetter,
Log: s.logger,
}
r := controller.NewDownloadRequestReconciler(
s.mgr.GetClient(),
clock.RealClock{},
newPluginManager,
backupStoreGetter,
s.logger,
)
if err := r.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.DownloadRequest)
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ func TestRemoveControllers(t *testing.T) {
errorExpected bool
}{
{
name: "Remove one disabable controller",
name: "Remove one disable controller",
disabledControllers: []string{
controller.Backup,
},
errorExpected: false,
},
{
name: "Remove all disabable controllers",
name: "Remove all disable controllers",
disabledControllers: []string{
controller.Backup,
controller.BackupDeletion,
Expand All @@ -102,15 +102,15 @@ func TestRemoveControllers(t *testing.T) {
errorExpected: false,
},
{
name: "Remove with a non-disabable controller included",
name: "Remove with a non-disable controller included",
disabledControllers: []string{
controller.Backup,
controller.BackupStorageLocation,
},
errorExpected: true,
},
{
name: "Remove with a misspelled/inexisting controller name",
name: "Remove with a misspelled/non-existing controller name",
disabledControllers: []string{
"go",
},
Expand All @@ -122,16 +122,16 @@ func TestRemoveControllers(t *testing.T) {
enabledControllers := map[string]func() controllerRunInfo{
controller.BackupSync: func() controllerRunInfo { return controllerRunInfo{} },
controller.Backup: func() controllerRunInfo { return controllerRunInfo{} },
controller.Schedule: func() controllerRunInfo { return controllerRunInfo{} },
controller.GarbageCollection: func() controllerRunInfo { return controllerRunInfo{} },
controller.BackupDeletion: func() controllerRunInfo { return controllerRunInfo{} },
controller.Restore: func() controllerRunInfo { return controllerRunInfo{} },
controller.ResticRepo: func() controllerRunInfo { return controllerRunInfo{} },
controller.DownloadRequest: func() controllerRunInfo { return controllerRunInfo{} },
}

enabledRuntimeControllers := map[string]struct{}{
controller.ServerStatusRequest: {},
controller.Schedule: {},
controller.BackupDeletion: {},
controller.ResticRepo: {},
controller.DownloadRequest: {},
}

totalNumOriginalControllers := len(enabledControllers) + len(enabledRuntimeControllers)
Expand Down
61 changes: 38 additions & 23 deletions pkg/controller/download_request_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,31 +32,47 @@ import (
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
)

// DownloadRequestReconciler reconciles a DownloadRequest object
type DownloadRequestReconciler struct {
Scheme *runtime.Scheme
Client kbclient.Client
Clock clock.Clock
// downloadRequestReconciler reconciles a DownloadRequest object
type downloadRequestReconciler struct {
client kbclient.Client
clock clock.Clock
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
NewPluginManager func(logrus.FieldLogger) clientmgmt.Manager
BackupStoreGetter persistence.ObjectBackupStoreGetter
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter

Log logrus.FieldLogger
log logrus.FieldLogger
}

// NewDownloadRequestReconciler initializes and returns downloadRequestReconciler struct.
func NewDownloadRequestReconciler(
client kbclient.Client,
clock clock.Clock,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
log logrus.FieldLogger,
) *downloadRequestReconciler {
return &downloadRequestReconciler{
client: client,
clock: clock,
newPluginManager: newPluginManager,
backupStoreGetter: backupStoreGetter,
log: log,
}
}

// +kubebuilder:rbac:groups=velero.io,resources=downloadrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=downloadrequests/status,verbs=get;update;patch
func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithFields(logrus.Fields{
func (r *downloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.log.WithFields(logrus.Fields{
"controller": "download-request",
"downloadRequest": req.NamespacedName,
})

// Fetch the DownloadRequest instance.
log.Debug("Getting DownloadRequest")
downloadRequest := &velerov1api.DownloadRequest{}
if err := r.Client.Get(ctx, req.NamespacedName, downloadRequest); err != nil {
if err := r.client.Get(ctx, req.NamespacedName, downloadRequest); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find DownloadRequest")
return ctrl.Result{}, nil
Expand All @@ -70,19 +85,19 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
original := downloadRequest.DeepCopy()
defer func() {
// Always attempt to Patch the downloadRequest object and status after each reconciliation.
if err := r.Client.Patch(ctx, downloadRequest, kbclient.MergeFrom(original)); err != nil {
if err := r.client.Patch(ctx, downloadRequest, kbclient.MergeFrom(original)); err != nil {
log.WithError(err).Error("Error updating download request")
return
}
}()

if downloadRequest.Status != (velerov1api.DownloadRequestStatus{}) && downloadRequest.Status.Expiration != nil {
if downloadRequest.Status.Expiration.Time.Before(r.Clock.Now()) {
if downloadRequest.Status.Expiration.Time.Before(r.clock.Now()) {

// Delete any request that is expired, regardless of the phase: it is not
// worth proceeding and trying/retrying to find it.
log.Debug("DownloadRequest has expired - deleting")
if err := r.Client.Delete(ctx, downloadRequest); err != nil {
if err := r.client.Delete(ctx, downloadRequest); err != nil {
log.WithError(err).Error("Error deleting an expired download request")
return ctrl.Result{}, errors.WithStack(err)
}
Expand All @@ -103,12 +118,12 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if downloadRequest.Status.Phase == "" || downloadRequest.Status.Phase == velerov1api.DownloadRequestPhaseNew {

// Update the expiration.
downloadRequest.Status.Expiration = &metav1.Time{Time: r.Clock.Now().Add(persistence.DownloadURLTTL)}
downloadRequest.Status.Expiration = &metav1.Time{Time: r.clock.Now().Add(persistence.DownloadURLTTL)}

if downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreLog ||
downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreResults {
restore := &velerov1api.Restore{}
if err := r.Client.Get(ctx, kbclient.ObjectKey{
if err := r.client.Get(ctx, kbclient.ObjectKey{
Namespace: downloadRequest.Namespace,
Name: downloadRequest.Spec.Target.Name,
}, restore); err != nil {
Expand All @@ -118,25 +133,25 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

backup := &velerov1api.Backup{}
if err := r.Client.Get(ctx, kbclient.ObjectKey{
if err := r.client.Get(ctx, kbclient.ObjectKey{
Namespace: downloadRequest.Namespace,
Name: backupName,
}, backup); err != nil {
return ctrl.Result{}, errors.WithStack(err)
}

location := &velerov1api.BackupStorageLocation{}
if err := r.Client.Get(ctx, kbclient.ObjectKey{
if err := r.client.Get(ctx, kbclient.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Spec.StorageLocation,
}, location); err != nil {
return ctrl.Result{}, errors.WithStack(err)
}

pluginManager := r.NewPluginManager(log)
pluginManager := r.newPluginManager(log)
defer pluginManager.CleanupClients()

backupStore, err := r.BackupStoreGetter.Get(location, pluginManager, log)
backupStore, err := r.backupStoreGetter.Get(location, pluginManager, log)
if err != nil {
log.WithError(err).Error("Error getting a backup store")
return ctrl.Result{}, errors.WithStack(err)
Expand All @@ -149,15 +164,15 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
downloadRequest.Status.Phase = velerov1api.DownloadRequestPhaseProcessed

// Update the expiration again to extend the time we wait (the TTL) to start after successfully processing the URL.
downloadRequest.Status.Expiration = &metav1.Time{Time: r.Clock.Now().Add(persistence.DownloadURLTTL)}
downloadRequest.Status.Expiration = &metav1.Time{Time: r.clock.Now().Add(persistence.DownloadURLTTL)}
}

// Requeue is mostly to handle deleting any expired requests that were not
// deleted as part of the normal client flow for whatever reason.
return ctrl.Result{Requeue: true}, nil
}

func (r *DownloadRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *downloadRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.DownloadRequest{}).
Complete(r)
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/download_request_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = Describe("Download Request Reconciler", func() {
test.downloadRequest.Status.Expiration = &metav1.Time{Time: rClock.Now().Add(-1 * time.Minute)}
}

fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()
err = fakeClient.Create(context.TODO(), test.downloadRequest)
Expect(err).To(BeNil())

Expand All @@ -109,13 +109,13 @@ var _ = Describe("Download Request Reconciler", func() {

// Setup reconciler
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
r := DownloadRequestReconciler{
Client: fakeClient,
Clock: rClock,
NewPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
BackupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores),
Log: velerotest.NewLogger(),
}
r := NewDownloadRequestReconciler(
fakeClient,
rClock,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeObjectBackupStoreGetter(backupStores),
velerotest.NewLogger(),
)

if test.backupLocation != nil && test.expectGetsURL {
backupStores[test.backupLocation.Name].On("GetDownloadURL", test.downloadRequest.Spec.Target).Return("a-url", nil)
Expand All @@ -136,7 +136,7 @@ var _ = Describe("Download Request Reconciler", func() {
}

instance := &velerov1api.DownloadRequest{}
err = r.Client.Get(ctx, kbclient.ObjectKey{Name: test.downloadRequest.Name, Namespace: test.downloadRequest.Namespace}, instance)
err = r.client.Get(ctx, kbclient.ObjectKey{Name: test.downloadRequest.Name, Namespace: test.downloadRequest.Namespace}, instance)

if test.expired {
Expect(instance).ToNot(Equal(test.downloadRequest))
Expand All @@ -153,7 +153,7 @@ var _ = Describe("Download Request Reconciler", func() {
if test.expectGetsURL {
Expect(string(instance.Status.Phase)).To(Equal(string(velerov1api.DownloadRequestPhaseProcessed)))
Expect(instance.Status.DownloadURL).To(Equal("a-url"))
Expect(velerotest.TimesAreEqual(instance.Status.Expiration.Time, r.Clock.Now().Add(signedURLTTL))).To(BeTrue())
Expect(velerotest.TimesAreEqual(instance.Status.Expiration.Time, r.clock.Now().Add(signedURLTTL))).To(BeTrue())
}
},

Expand Down
Loading

0 comments on commit c8818ec

Please sign in to comment.