Skip to content

Commit

Permalink
Refoctor backup controller based on kubebuilder
Browse files Browse the repository at this point in the history
Signed-off-by: Ming <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Feb 15, 2023
1 parent a761111 commit 1c0e926
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 426 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5835-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant client from backup controller
35 changes: 18 additions & 17 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"

"github.com/vmware-tanzu/velero/internal/hook"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"

kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/pkg/discovery"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
Expand All @@ -50,6 +51,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/collections"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

// BackupVersion is the current backup major version for Velero.
Expand All @@ -71,7 +73,7 @@ type Backupper interface {

// kubernetesBackupper implements Backupper.
type kubernetesBackupper struct {
backupClient velerov1client.BackupsGetter
kbClient kbclient.Client
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
podCommandExecutor podexec.PodCommandExecutor
Expand All @@ -98,7 +100,7 @@ func cohabitatingResources() map[string]*cohabitatingResource {

// NewKubernetesBackupper creates a new kubernetesBackupper.
func NewKubernetesBackupper(
backupClient velerov1client.BackupsGetter,
kbClient kbclient.Client,
discoveryHelper discovery.Helper,
dynamicFactory client.DynamicFactory,
podCommandExecutor podexec.PodCommandExecutor,
Expand All @@ -109,7 +111,7 @@ func NewKubernetesBackupper(
uploaderType string,
) (Backupper, error) {
return &kubernetesBackupper{
backupClient: backupClient,
kbClient: kbClient,
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
podCommandExecutor: podCommandExecutor,
Expand Down Expand Up @@ -272,8 +274,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
log.WithField("progress", "").Infof("Collected %d items matching the backup spec from the Kubernetes API (actual number of items backed up may be more or less depending on velero.io/exclude-from-backup annotation, plugins returning additional related items to back up, etc.)", len(items))

backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(items)}
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d}}}`, len(items))
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(context.TODO(), backupRequest.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
original := backupRequest.Backup.DeepCopy()
backupRequest.Backup.Status.Progress.TotalItems = len(items)
if err := kube.PatchResource(original, backupRequest.Backup, kb.kbClient); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress.totalItems")
}

Expand Down Expand Up @@ -323,11 +326,10 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
lastUpdate = &val
case <-ticker.C:
if lastUpdate != nil {
backupRequest.Status.Progress.TotalItems = lastUpdate.totalItems
backupRequest.Status.Progress.ItemsBackedUp = lastUpdate.itemsBackedUp

patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, lastUpdate.totalItems, lastUpdate.itemsBackedUp)
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(context.TODO(), backupRequest.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: lastUpdate.totalItems, ItemsBackedUp: lastUpdate.itemsBackedUp}
original := backupRequest.Backup.DeepCopy()
backupRequest.Backup.Status.Progress = &velerov1api.BackupProgress{TotalItems: lastUpdate.totalItems, ItemsBackedUp: lastUpdate.itemsBackedUp}
if err := kube.PatchResource(original, backupRequest.Backup, kb.kbClient); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
lastUpdate = nil
Expand Down Expand Up @@ -402,11 +404,10 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,

// do a final update on progress since we may have just added some CRDs and may not have updated
// for the last few processed items.
backupRequest.Status.Progress.TotalItems = len(backupRequest.BackedUpItems)
backupRequest.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems)

patch = fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, len(backupRequest.BackedUpItems), len(backupRequest.BackedUpItems))
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(context.TODO(), backupRequest.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
original = backupRequest.Backup.DeepCopy()
backupRequest.Backup.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
if err := kube.PatchResource(original, backupRequest.Backup, kb.kbClient); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2806,7 +2806,7 @@ func newHarness(t *testing.T) *harness {
return &harness{
APIServer: apiServer,
backupper: &kubernetesBackupper{
backupClient: apiServer.VeleroClient.VeleroV1(),
kbClient: test.NewFakeControllerRuntimeClient(t),
dynamicFactory: client.NewDynamicFactory(apiServer.DynamicClient),
discoveryHelper: discoveryHelper,

Expand Down
98 changes: 23 additions & 75 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
Expand All @@ -50,7 +49,6 @@ import (
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/storage"
Expand Down Expand Up @@ -574,32 +572,6 @@ func (s *server) initRepoManager() error {
return nil
}

func (s *server) getCSIVolumeSnapshotListers() snapshotv1listers.VolumeSnapshotLister {
// Make empty listers that will only be populated if CSI is properly enabled.
var vsLister snapshotv1listers.VolumeSnapshotLister
var err error

// If CSI is enabled, check for the CSI groups and generate the listers
// If CSI isn't enabled, return empty listers.
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
_, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String())
switch {
case apierrors.IsNotFound(err):
// CSI is enabled, but the required CRDs aren't installed, so halt.
s.logger.Fatalf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String())
case err == nil:
// CSI is enabled, and the resources were found.
// Instantiate the listers fully
s.logger.Debug("Creating CSI listers")
// Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe.
vsLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshots().Lister()
case err != nil:
cmd.CheckError(err)
}
}
return vsLister
}

func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string) error {
s.logger.Info("Starting controllers")

Expand All @@ -626,52 +598,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string

backupTracker := controller.NewBackupTracker()

backupControllerRunInfo := func() controllerRunInfo {
backupper, err := backup.NewKubernetesBackupper(
s.veleroClient.VeleroV1(),
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.kubeClient.CoreV1(),
s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToFsBackup,
s.config.clientPageSize,
s.config.uploaderType,
)
cmd.CheckError(err)

backupController := controller.NewBackupController(
s.sharedInformerFactory.Velero().V1().Backups(),
s.veleroClient.VeleroV1(),
s.discoveryHelper,
backupper,
s.logger,
s.logLevel,
newPluginManager,
backupTracker,
s.mgr.GetClient(),
s.config.defaultBackupLocation,
s.config.defaultVolumesToFsBackup,
s.config.defaultBackupTTL,
s.config.defaultCSISnapshotTimeout,
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
defaultVolumeSnapshotLocations,
s.metrics,
backupStoreGetter,
s.config.formatFlag.Parse(),
s.getCSIVolumeSnapshotListers(),
s.csiSnapshotClient,
s.credentialFileStore,
)

return controllerRunInfo{
controller: backupController,
numWorkers: defaultControllerWorkers,
}
}

restoreControllerRunInfo := func() controllerRunInfo {
restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
Expand Down Expand Up @@ -714,11 +640,11 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
// This is because of PVB and PVR are used by node agent DaemonSet,
// and BSL controller is mandatory for Velero to work.
enabledControllers := map[string]func() controllerRunInfo{
controller.Backup: backupControllerRunInfo,
controller.Restore: restoreControllerRunInfo,
}
// Note: all runtime type controllers that can be disabled are grouped separately, below:
enabledRuntimeControllers := map[string]struct{}{
controller.Backup: {},
controller.ServerStatusRequest: {},
controller.DownloadRequest: {},
controller.Schedule: {},
Expand Down Expand Up @@ -773,6 +699,28 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger.WithField("informer", informer).Info("Informer cache synced")
}

if _, ok := enabledRuntimeControllers[controller.Backup]; ok {
backupper, err := backup.NewKubernetesBackupper(
s.mgr.GetClient(),
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.kubeClient.CoreV1(),
s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToFsBackup,
s.config.clientPageSize,
s.config.uploaderType,
)
cmd.CheckError(err)
if err := controller.NewBackupReconciler(s.ctx, s.discoveryHelper, backupper, s.logger, s.logLevel, newPluginManager, backupTracker, s.mgr.GetClient(),
s.config.defaultBackupLocation, s.config.defaultVolumesToFsBackup, s.config.defaultBackupTTL, s.config.defaultCSISnapshotTimeout, defaultVolumeSnapshotLocations,
s.metrics, backupStoreGetter, s.config.formatFlag.Parse(), s.credentialFileStore).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Backup)
}
}

bslr := controller.NewBackupStorageLocationReconciler(
s.ctx,
s.mgr.GetClient(),
Expand Down
Loading

0 comments on commit 1c0e926

Please sign in to comment.