Skip to content

Commit

Permalink
delete snaps with vol backup
Browse files Browse the repository at this point in the history
  • Loading branch information
vineela1999 committed Mar 1, 2023
1 parent 3e776d0 commit 558266f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 27 deletions.
8 changes: 5 additions & 3 deletions controllers/backup/backupcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package backup
import (
"context"
"fmt"
"github.com/soda-cdm/kahu/volume"
"regexp"
"strings"

"github.com/soda-cdm/kahu/volume"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -243,8 +244,9 @@ func (ctrl *controller) ensureSupportedResourceList(backup *kahuapi.Backup) (*ka

func (ctrl *controller) deleteBackup(backup *kahuapi.Backup) error {
ctrl.logger.Infof("Initiating backup(%s) delete", backup.Name)

err := ctrl.removeVolumeBackup(backup)
backupResources := NewBackupResources(ctrl.logger,
ctrl.dynamicClient, ctrl.kubeClient, ctrl.discoveryHelper, ctrl)
err := ctrl.removeVolumeBackup(backup, backupResources)
if err != nil {
ctrl.logger.Errorf("Unable to delete volume backup. %s", err)
return err
Expand Down
26 changes: 25 additions & 1 deletion controllers/backup/backupvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,25 @@ func (ctrl *controller) ensureVolumeBackupParameters(backup *kahuapi.Backup) (ma
}

func (ctrl *controller) removeVolumeBackup(
backup *kahuapi.Backup) error {
backup *kahuapi.Backup, resources Resources) error {

pvcs, err := ctrl.getVolumes(backup, resources)
if err != nil {
ctrl.logger.Errorf("Volume backup validation failed. %s", err)

return err
}

volumeGroup, err := ctrl.volumeHandler.Group().ByPVCs(backup.Name, pvcs)
if err != nil {
ctrl.logger.Errorf("Failed to ensure volume group. %s", err)
return err
}

snapshotter, err := ctrl.volumeHandler.Snapshot().ByVolumeGroup(volumeGroup)
if err != nil {
return err
}
vbcList, err := ctrl.volumeBackupClient.List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.Set{
volumeContentBackupLabel: backup.Name,
Expand All @@ -155,6 +172,13 @@ func (ctrl *controller) removeVolumeBackup(
}

for _, vbc := range vbcList.Items {

snapName := vbc.Spec.BackupSourceRef.Name
err = snapshotter.Delete(snapName)
if err != nil {
return err
}

if vbc.DeletionTimestamp != nil { // ignore deleting volume backup content
continue
}
Expand Down
87 changes: 71 additions & 16 deletions controllers/snapshot/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package snapshot

import (
"context"
"encoding/json"
"fmt"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand All @@ -46,9 +49,10 @@ import (
)

const (
controllerName = "snapshot-controller"
defaultReSyncTimeLoop = 30 * time.Minute
annSnapshotVolumeSync = "kahu.io/snapshot-volume-sync"
controllerName = "snapshot-controller"
defaultReSyncTimeLoop = 30 * time.Minute
annSnapshotVolumeSync = "kahu.io/snapshot-volume-sync"
volumeSnapshotFinalizer = "kahu.io/volume-snapshot-done"
)

type controller struct {
Expand Down Expand Up @@ -146,6 +150,39 @@ func (ctrl *controller) reSync() {
}
}

func (ctrl *controller) patchSnapshot(oldSnapshot, updatedSnapshot *kahuapi.VolumeSnapshot) (*kahuapi.VolumeSnapshot, error) {
origBytes, err := json.Marshal(oldSnapshot)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original backup")
}

updatedBytes, err := json.Marshal(updatedSnapshot)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated backup")
}

patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for backup")
}

newSnapshot, err := ctrl.kahuClient.KahuV1beta1().VolumeSnapshots().Patch(context.TODO(),
oldSnapshot.Name,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching backup")
}

_, err = utils.StoreRevisionUpdate(ctrl.processedSnapshot, newSnapshot, "Restore")
if err != nil {
return newSnapshot, errors.Wrap(err, "Failed to updated processed restore cache")
}

return newSnapshot, nil
}

func (ctrl *controller) processQueue(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand All @@ -164,15 +201,7 @@ func (ctrl *controller) processQueue(key string) error {
return errors.Wrap(err, fmt.Sprintf("error getting snapshot %s from lister", name))
}

if snapshot.DeletionTimestamp != nil {
return nil
}

if snapshotHandled(snapshot) {
ctrl.logger.Infof("Volume Snapshot %s already handled", snapshot.Name)
return nil
}

newSnapshot := snapshot.DeepCopy()
newObj, err := utils.StoreRevisionUpdate(ctrl.processedSnapshot, snapshot, "Snapshot")
if err != nil {
ctrl.logger.Errorf("%s", err)
Expand All @@ -181,23 +210,41 @@ func (ctrl *controller) processQueue(key string) error {
return nil
}

if newSnapshot.DeletionTimestamp != nil {
err := ctrl.deleteCSISnapshot(snapshot)
if err != nil {
return err
}
snapshotClone := snapshot.DeepCopy()
utils.RemoveFinalizer(snapshotClone, volumeSnapshotFinalizer)
snapshot, err = ctrl.patchSnapshot(snapshot, snapshotClone)
if err != nil {
ctrl.logger.Errorf("removing finalizer failed for %s", snapshotClone.Name)
}
}

if snapshotHandled(newSnapshot) {
ctrl.logger.Infof("Volume Snapshot %s already handled", newSnapshot.Name)
return nil
}

// Identify volumes for snapshot
snapshot, err = ctrl.syncSnapshotVolumes(snapshot)
newSnapshot, err = ctrl.syncSnapshotVolumes(newSnapshot)
if err != nil {
return err
}

// check volume snapshot support for CSI
support, err := ctrl.supportCSISnapshot(snapshot)
support, err := ctrl.supportCSISnapshot(newSnapshot)
if err != nil {
return err
}

if support {
return ctrl.handleCSISnapshot(snapshot)
return ctrl.handleCSISnapshot(newSnapshot)
}

return ctrl.handleSnapshot(snapshot)
return ctrl.handleSnapshot(newSnapshot)
}

func (ctrl *controller) syncSnapshotVolumes(snapshot *kahuapi.VolumeSnapshot) (*kahuapi.VolumeSnapshot, error) {
Expand Down Expand Up @@ -271,6 +318,14 @@ func (ctrl *controller) handleCSISnapshot(snapshot *kahuapi.VolumeSnapshot) erro
return err
}

func (ctrl *controller) deleteCSISnapshot(snapshot *kahuapi.VolumeSnapshot) error {
err := ctrl.csiSnapshotHandler.Run(snapshot.Name, func() error {
ctrl.logger.Infof("Volume Snapshot %s came till before delete func *****", snapshot.Name)
return ctrl.csiSnapshotter.Delete(snapshot)
})
return err
}

func (ctrl *controller) handleSnapshot(snapshot *kahuapi.VolumeSnapshot) error {
ctrl.logger.Info("Waiting for external snapshot controller to handle volume snapshot")
ctrl.eventRecorder.Event(snapshot, v1.EventTypeNormal, "ExternalSnapshotHandling",
Expand Down
41 changes: 40 additions & 1 deletion controllers/snapshot/csi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ import (
const (
csiSnapshotTimeout = 5 * time.Minute
defaultSnapshotNamePrefix = "snapshot"
volumeSnapshotFinalizer = "kahu.io/volume-snapshot-done"
)

type Snapshoter interface {
Handle(snapshot *kahuapi.VolumeSnapshot) error
Delete(snapshot *kahuapi.VolumeSnapshot) error
}

type snapshoter struct {
Expand Down Expand Up @@ -76,8 +78,33 @@ func NewSnapshotter(ctx context.Context,
}, nil
}

func (s *snapshoter) Delete(snapshot *kahuapi.VolumeSnapshot) error {
//delete snapshot for each volumes
// create CSI object
s.logger.Infof(" delete volume snapshot")
csiSnapshotClass, err := s.volSnapshotClassSyncer.SnapshotClassByProvider(*snapshot.Spec.SnapshotProvider)
if err != nil {
return err
}

for _, snapshotState := range snapshot.Status.SnapshotStates {
// ignore if already CSI object deleted
if snapshotState.CSISnapshotRef == nil {
continue
}
// delete CSI object
if err := s.deleteSnapshot(snapshot, csiSnapshotClass.Name, snapshotState.PVC); err != nil {
s.logger.Errorf("Error applying volume snapshot %s", err)
return err
}
}
return err

}

func (s *snapshoter) Handle(snapshot *kahuapi.VolumeSnapshot) error {
// create snapshot for each snapshot volumes
s.logger.Infof(" applying volume snapshot in handle ")
csiSnapshotClass, err := s.volSnapshotClassSyncer.SnapshotClassByProvider(*snapshot.Spec.SnapshotProvider)
if err != nil {
return err
Expand Down Expand Up @@ -108,7 +135,6 @@ func (s *snapshoter) Handle(snapshot *kahuapi.VolumeSnapshot) error {
s.logger.Errorf("Failed to update Volume Snapshot(%s) status, %s", snapshot.Name, err)
return err
}

kahuVolSnapshot.Status.ReadyToUse = &readyToUse
_, err = s.kahuClient.KahuV1beta1().
VolumeSnapshots().
Expand All @@ -121,6 +147,19 @@ func (s *snapshoter) Handle(snapshot *kahuapi.VolumeSnapshot) error {
return err
}

func (s *snapshoter) deleteSnapshot(kahuVolSnapshot *kahuapi.VolumeSnapshot,
snapshotClassName string,
pvc kahuapi.ResourceReference) error {
for _, states := range kahuVolSnapshot.Status.SnapshotStates {
s.logger.Infof("***deleting Volume Snapshot(%s) status***", states.CSISnapshotRef.Name)
err := s.snapshotClient.VolumeSnapshots(pvc.Namespace).Delete(context.TODO(), states.CSISnapshotRef.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
}
return nil
}

func (s *snapshoter) applySnapshot(kahuVolSnapshotName string,
snapshotClassName string,
pvc kahuapi.ResourceReference) error {
Expand Down
17 changes: 11 additions & 6 deletions volume/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ import (
)

const (
LabelBackupName = "kahu.backup.name"
LabelProvisionerName = "kahu.provisioner.name"
LabelBackupName = "kahu.backup.name"
LabelProvisionerName = "kahu.provisioner.name"
volumeSnapshotFinalizer = "kahu.io/volume-snapshot-done"
)

type Interface interface {
Apply() error
WaitForSnapshotToReady(refName string, timeout time.Duration) error
Delete() (*kahuapi.VolumeSnapshot, error)
Delete(snapName string) error
GetSnapshots() ([]*kahuapi.VolumeSnapshot, error)
}

Expand Down Expand Up @@ -156,7 +157,9 @@ func (s *snapshoter) snapshotClassByPV(name string) (*corev1.PersistentVolume, e
func (s *snapshoter) kahuSnapshot(backup string,
provisioner string, volumes []kahuapi.ResourceReference) (*kahuapi.VolumeSnapshot, error) {
kahuSnapshot := s.volGroupToSnapshot(backup, provisioner, volumes)

if !utils.ContainsFinalizer(kahuSnapshot, volumeSnapshotFinalizer) {
utils.SetFinalizer(kahuSnapshot, volumeSnapshotFinalizer)
}
return s.kahuClient.KahuV1beta1().VolumeSnapshots().Create(context.TODO(), kahuSnapshot, metav1.CreateOptions{})
}

Expand All @@ -181,8 +184,10 @@ func (s *snapshoter) volGroupToSnapshot(backup string,
}
}

func (s *snapshoter) Delete() (*kahuapi.VolumeSnapshot, error) {
return nil, nil
func (s *snapshoter) Delete(snapName string) error {
s.logger.Infof("***Deleting snapshots(%v)***:", snapName)
err := s.kahuClient.KahuV1beta1().VolumeSnapshots().Delete(context.TODO(), snapName, metav1.DeleteOptions{})
return err
}

func (s *snapshoter) WaitForSnapshotToReady(refName string, timeout time.Duration) error {
Expand Down

0 comments on commit 558266f

Please sign in to comment.