Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete kahu snapshots and CSI snapshots while deleting the volume backup #148

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions controllers/backup/backupcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package backup
import (
"context"
"fmt"
"github.com/soda-cdm/kahu/volume"
"regexp"
"strings"

"github.com/soda-cdm/kahu/volume"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -243,8 +244,9 @@ func (ctrl *controller) ensureSupportedResourceList(backup *kahuapi.Backup) (*ka

func (ctrl *controller) deleteBackup(backup *kahuapi.Backup) error {
ctrl.logger.Infof("Initiating backup(%s) delete", backup.Name)

err := ctrl.removeVolumeBackup(backup)
backupResources := NewBackupResources(ctrl.logger,
ctrl.dynamicClient, ctrl.kubeClient, ctrl.discoveryHelper, ctrl)
err := ctrl.removeVolumeBackup(backup, backupResources)
if err != nil {
ctrl.logger.Errorf("Unable to delete volume backup. %s", err)
return err
Expand Down
26 changes: 25 additions & 1 deletion controllers/backup/backupvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,25 @@ func (ctrl *controller) ensureVolumeBackupParameters(backup *kahuapi.Backup) (ma
}

func (ctrl *controller) removeVolumeBackup(
backup *kahuapi.Backup) error {
backup *kahuapi.Backup, resources Resources) error {

pvcs, err := ctrl.getVolumes(backup, resources)
if err != nil {
ctrl.logger.Errorf("Volume backup validation failed. %s", err)

return err
}

volumeGroup, err := ctrl.volumeHandler.Group().ByPVCs(backup.Name, pvcs)
if err != nil {
ctrl.logger.Errorf("Failed to ensure volume group. %s", err)
return err
}

snapshotter, err := ctrl.volumeHandler.Snapshot().ByVolumeGroup(volumeGroup)
if err != nil {
return err
}
vbcList, err := ctrl.volumeBackupClient.List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.Set{
volumeContentBackupLabel: backup.Name,
Expand All @@ -155,6 +172,13 @@ func (ctrl *controller) removeVolumeBackup(
}

for _, vbc := range vbcList.Items {

snapName := vbc.Spec.BackupSourceRef.Name
err = snapshotter.Delete(snapName)
if err != nil {
return err
}

if vbc.DeletionTimestamp != nil { // ignore deleting volume backup content
continue
}
Expand Down
92 changes: 76 additions & 16 deletions controllers/snapshot/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package snapshot

import (
"context"
"encoding/json"
"fmt"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand All @@ -46,9 +49,10 @@ import (
)

const (
controllerName = "snapshot-controller"
defaultReSyncTimeLoop = 30 * time.Minute
annSnapshotVolumeSync = "kahu.io/snapshot-volume-sync"
controllerName = "snapshot-controller"
defaultReSyncTimeLoop = 30 * time.Minute
annSnapshotVolumeSync = "kahu.io/snapshot-volume-sync"
volumeSnapshotFinalizer = "kahu.io/volume-snapshot-done"
)

type controller struct {
Expand Down Expand Up @@ -146,6 +150,53 @@ func (ctrl *controller) reSync() {
}
}

func (ctrl *controller) patchSnapshot(oldSnapshot, updatedSnapshot *kahuapi.VolumeSnapshot) (*kahuapi.VolumeSnapshot, error) {
origBytes, err := json.Marshal(oldSnapshot)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original backup")
}

updatedBytes, err := json.Marshal(updatedSnapshot)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated backup")
}

patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for backup")
}

newSnapshot, err := ctrl.kahuClient.KahuV1beta1().VolumeSnapshots().Patch(context.TODO(),
oldSnapshot.Name,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching backup")
}

_, err = utils.StoreRevisionUpdate(ctrl.processedSnapshot, newSnapshot, "Restore")
if err != nil {
return newSnapshot, errors.Wrap(err, "Failed to updated processed restore cache")
}

return newSnapshot, nil
}

func (ctrl *controller) HandleSnapshotDelete(snapshot *kahuapi.VolumeSnapshot) error {
err := ctrl.deleteCSISnapshot(snapshot)
if err != nil {
return err
}
snapshotClone := snapshot.DeepCopy()
utils.RemoveFinalizer(snapshotClone, volumeSnapshotFinalizer)
snapshot, err = ctrl.patchSnapshot(snapshot, snapshotClone)
if err != nil {
ctrl.logger.Errorf("removing finalizer failed for %s", snapshotClone.Name)
}
return nil
}

func (ctrl *controller) processQueue(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand All @@ -164,15 +215,7 @@ func (ctrl *controller) processQueue(key string) error {
return errors.Wrap(err, fmt.Sprintf("error getting snapshot %s from lister", name))
}

if snapshot.DeletionTimestamp != nil {
return nil
}

if snapshotHandled(snapshot) {
ctrl.logger.Infof("Volume Snapshot %s already handled", snapshot.Name)
return nil
}

newSnapshot := snapshot.DeepCopy()
newObj, err := utils.StoreRevisionUpdate(ctrl.processedSnapshot, snapshot, "Snapshot")
if err != nil {
ctrl.logger.Errorf("%s", err)
Expand All @@ -181,23 +224,32 @@ func (ctrl *controller) processQueue(key string) error {
return nil
}

if newSnapshot.DeletionTimestamp != nil {
return ctrl.HandleSnapshotDelete(snapshot)
}

if snapshotHandled(newSnapshot) {
ctrl.logger.Infof("Volume Snapshot %s already handled", newSnapshot.Name)
return nil
}

// Identify volumes for snapshot
snapshot, err = ctrl.syncSnapshotVolumes(snapshot)
newSnapshot, err = ctrl.syncSnapshotVolumes(newSnapshot)
if err != nil {
return err
}

// check volume snapshot support for CSI
support, err := ctrl.supportCSISnapshot(snapshot)
support, err := ctrl.supportCSISnapshot(newSnapshot)
if err != nil {
return err
}

if support {
return ctrl.handleCSISnapshot(snapshot)
return ctrl.handleCSISnapshot(newSnapshot)
}

return ctrl.handleSnapshot(snapshot)
return ctrl.handleSnapshot(newSnapshot)
}

func (ctrl *controller) syncSnapshotVolumes(snapshot *kahuapi.VolumeSnapshot) (*kahuapi.VolumeSnapshot, error) {
Expand Down Expand Up @@ -271,6 +323,14 @@ func (ctrl *controller) handleCSISnapshot(snapshot *kahuapi.VolumeSnapshot) erro
return err
}

func (ctrl *controller) deleteCSISnapshot(snapshot *kahuapi.VolumeSnapshot) error {
err := ctrl.csiSnapshotHandler.Run(snapshot.Name, func() error {
ctrl.logger.Infof("Volume Snapshot %s came till before delete func *****", snapshot.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return ctrl.csiSnapshotter.Delete(snapshot)
})
return err
}

func (ctrl *controller) handleSnapshot(snapshot *kahuapi.VolumeSnapshot) error {
ctrl.logger.Info("Waiting for external snapshot controller to handle volume snapshot")
ctrl.eventRecorder.Event(snapshot, v1.EventTypeNormal, "ExternalSnapshotHandling",
Expand Down
37 changes: 36 additions & 1 deletion controllers/snapshot/csi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (

type Snapshoter interface {
Handle(snapshot *kahuapi.VolumeSnapshot) error
Delete(snapshot *kahuapi.VolumeSnapshot) error
}

type snapshoter struct {
Expand Down Expand Up @@ -76,6 +77,28 @@ func NewSnapshotter(ctx context.Context,
}, nil
}

func (s *snapshoter) Delete(snapshot *kahuapi.VolumeSnapshot) error {
//delete snapshot for each volumes
csiSnapshotClass, err := s.volSnapshotClassSyncer.SnapshotClassByProvider(*snapshot.Spec.SnapshotProvider)
if err != nil {
return err
}

for _, snapshotState := range snapshot.Status.SnapshotStates {
// ignore if already CSI object deleted
if snapshotState.CSISnapshotRef == nil {
continue
}
// delete CSI object
if err := s.deleteSnapshot(snapshot, csiSnapshotClass.Name, snapshotState.PVC); err != nil {
s.logger.Errorf("Error applying volume snapshot %s", err)
return err
}
}
return err

}

func (s *snapshoter) Handle(snapshot *kahuapi.VolumeSnapshot) error {
// create snapshot for each snapshot volumes
csiSnapshotClass, err := s.volSnapshotClassSyncer.SnapshotClassByProvider(*snapshot.Spec.SnapshotProvider)
Expand Down Expand Up @@ -108,7 +131,6 @@ func (s *snapshoter) Handle(snapshot *kahuapi.VolumeSnapshot) error {
s.logger.Errorf("Failed to update Volume Snapshot(%s) status, %s", snapshot.Name, err)
return err
}

kahuVolSnapshot.Status.ReadyToUse = &readyToUse
_, err = s.kahuClient.KahuV1beta1().
VolumeSnapshots().
Expand All @@ -121,6 +143,19 @@ func (s *snapshoter) Handle(snapshot *kahuapi.VolumeSnapshot) error {
return err
}

func (s *snapshoter) deleteSnapshot(kahuVolSnapshot *kahuapi.VolumeSnapshot,
snapshotClassName string,
pvc kahuapi.ResourceReference) error {
for _, states := range kahuVolSnapshot.Status.SnapshotStates {
s.logger.Infof("Deleting CSI Volume Snapshot(%s)", states.CSISnapshotRef.Name)
err := s.snapshotClient.VolumeSnapshots(pvc.Namespace).Delete(context.TODO(), states.CSISnapshotRef.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
}
return nil
}

func (s *snapshoter) applySnapshot(kahuVolSnapshotName string,
snapshotClassName string,
pvc kahuapi.ResourceReference) error {
Expand Down
17 changes: 11 additions & 6 deletions volume/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ import (
)

const (
LabelBackupName = "kahu.backup.name"
LabelProvisionerName = "kahu.provisioner.name"
LabelBackupName = "kahu.backup.name"
LabelProvisionerName = "kahu.provisioner.name"
volumeSnapshotFinalizer = "kahu.io/volume-snapshot-done"
)

type Interface interface {
Apply() error
WaitForSnapshotToReady(refName string, timeout time.Duration) error
Delete() (*kahuapi.VolumeSnapshot, error)
Delete(snapName string) error
GetSnapshots() ([]*kahuapi.VolumeSnapshot, error)
}

Expand Down Expand Up @@ -156,7 +157,9 @@ func (s *snapshoter) snapshotClassByPV(name string) (*corev1.PersistentVolume, e
func (s *snapshoter) kahuSnapshot(backup string,
provisioner string, volumes []kahuapi.ResourceReference) (*kahuapi.VolumeSnapshot, error) {
kahuSnapshot := s.volGroupToSnapshot(backup, provisioner, volumes)

if !utils.ContainsFinalizer(kahuSnapshot, volumeSnapshotFinalizer) {
utils.SetFinalizer(kahuSnapshot, volumeSnapshotFinalizer)
}
return s.kahuClient.KahuV1beta1().VolumeSnapshots().Create(context.TODO(), kahuSnapshot, metav1.CreateOptions{})
}

Expand All @@ -181,8 +184,10 @@ func (s *snapshoter) volGroupToSnapshot(backup string,
}
}

func (s *snapshoter) Delete() (*kahuapi.VolumeSnapshot, error) {
return nil, nil
func (s *snapshoter) Delete(snapName string) error {
s.logger.Infof("Deleting kahu snapshots(%v):", snapName)
err := s.kahuClient.KahuV1beta1().VolumeSnapshots().Delete(context.TODO(), snapName, metav1.DeleteOptions{})
return err
}

func (s *snapshoter) WaitForSnapshotToReady(refName string, timeout time.Duration) error {
Expand Down