Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: clean volumes when restore volume failed (#5634) #5639

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/apis/pingcap/v1alpha1/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ func IsRestoreVolumeComplete(restore *Restore) bool {
return condition != nil && condition.Status == corev1.ConditionTrue
}

// IsRestoreVolumeFailed returns true if a Restore for volume is Failed
func IsRestoreVolumeFailed(restore *Restore) bool {
return restore.Spec.Mode == RestoreModeVolumeSnapshot &&
IsRestoreFailed(restore) &&
!IsRestoreVolumeComplete(restore)
}

// IsCleanVolumeComplete returns true if restored volumes are cleaned
func IsCleanVolumeComplete(restore *Restore) bool {
_, condition := GetRestoreCondition(&restore.Status, CleanVolumeComplete)
return condition != nil && condition.Status == corev1.ConditionTrue
}

// IsRestoreWarmUpStarted returns true if all the warmup jobs has successfully started
func IsRestoreWarmUpStarted(restore *Restore) bool {
_, condition := GetRestoreCondition(&restore.Status, RestoreWarmUpStarted)
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2396,6 +2396,8 @@ const (
// RestoreVolumeComplete means the Restore has successfully executed part-1 and the
// backup volumes have been rebuilded from the corresponding snapshot
RestoreVolumeComplete RestoreConditionType = "VolumeComplete"
// CleanVolumeComplete means volumes are cleaned successfully if restore volume failed
CleanVolumeComplete RestoreConditionType = "CleanVolumeComplete"
// RestoreWarmUpStarted means the Restore has successfully started warm up pods to
// initialize volumes restored from snapshots
RestoreWarmUpStarted RestoreConditionType = "WarmUpStarted"
Expand Down
28 changes: 28 additions & 0 deletions pkg/backup/restore/restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
}
}

if v1alpha1.IsRestoreFailed(restore) {
return nil
}

restoreJobName := restore.GetRestoreJobName()
_, err = rm.deps.JobLister.Jobs(ns).Get(restoreJobName)
if err == nil {
Expand Down Expand Up @@ -518,6 +522,30 @@ func (rm *restoreManager) volumeSnapshotRestore(r *v1alpha1.Restore, tc *v1alpha
}
}

if v1alpha1.IsRestoreVolumeFailed(r) && !v1alpha1.IsCleanVolumeComplete(r) {
klog.Infof("%s/%s restore volume failed, start to clean volumes", ns, name)

csb, reason, err := rm.readRestoreMetaFromExternalStorage(r)
if err != nil {
return reason, err
}
s, reason, err := snapshotter.NewSnapshotterForRestore(r.Spec.Mode, rm.deps)
if err != nil {
return reason, err
}
if err := s.CleanVolumes(r, csb); err != nil {
return "CleanVolumeFailed", err
}
klog.Infof("%s/%s clean volumes successfully", ns, name)

if err := rm.statusUpdater.Update(r, &v1alpha1.RestoreCondition{
Type: v1alpha1.CleanVolumeComplete,
Status: corev1.ConditionTrue,
}, nil); err != nil {
return "UpdateCleanVolumeCompleteFailed", err
}
}

return "", nil
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/backup/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Snapshotter interface {

// AddVolumeTags add operator related tags to volumes
AddVolumeTags(pvs []*corev1.PersistentVolume) error

CleanVolumes(r *v1alpha1.Restore, csb *CloudSnapBackup) error
}

type BaseSnapshotter struct {
Expand Down Expand Up @@ -183,6 +185,18 @@ func (s *BaseSnapshotter) prepareRestoreMetadata(r *v1alpha1.Restore, csb *Cloud
return "", nil
}

func (s *BaseSnapshotter) getRestoreVolumeIDs(csb *CloudSnapBackup) []string {
volumeIDs := make([]string, 0)
for _, store := range csb.TiKV.Stores {
for _, volume := range store.Volumes {
if volume.RestoreVolumeID != "" {
volumeIDs = append(volumeIDs, volume.RestoreVolumeID)
}
}
}
return volumeIDs
}

func checkCloudSnapBackup(b *CloudSnapBackup) (string, error) {
if b == nil {
return "GetCloudSnapBackupFailed", errors.New("restore for CloudSnapBackup not found")
Expand Down
16 changes: 16 additions & 0 deletions pkg/backup/snapshotter/snapshotter_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,19 @@ func (s *AWSSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.Pe
}
}
}

func (s *AWSSnapshotter) CleanVolumes(r *v1alpha1.Restore, csb *CloudSnapBackup) error {
if !v1alpha1.IsRestoreVolumeFailed(r) {
return errors.New("can't clean volumes if not restore volume failed")
}

volumeIDs := s.getRestoreVolumeIDs(csb)
ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency)
if err != nil {
return fmt.Errorf("new ec2 session error: %w", err)
}
if err := ec2Session.DeleteVolumes(volumeIDs); err != nil {
return fmt.Errorf("delete volumes error: %w", err)
}
return nil
}
5 changes: 5 additions & 0 deletions pkg/backup/snapshotter/snapshotter_gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,8 @@ func (s *GCPSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
// TODO implement it if support to restore snapshots to another az on GCP
return nil
}

func (s *GCPSnapshotter) CleanVolumes(r *v1alpha1.Restore, csb *CloudSnapBackup) error {
// TODO implement it if support to restore snapshots on GCP
return nil
}
4 changes: 4 additions & 0 deletions pkg/backup/snapshotter/snapshotter_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ func (s *NoneSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
// TODO implement it if support to restore snapshots to another az on GCP
return nil
}

func (s *NoneSnapshotter) CleanVolumes(r *v1alpha1.Restore, csb *CloudSnapBackup) error {
return nil
}
65 changes: 64 additions & 1 deletion pkg/backup/util/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package util
import (
"context"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -164,7 +165,7 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string, deleteRatio fl
if err != nil {
if aErr, ok := err.(awserr.Error); ok {
if aErr.Code() == "InvalidSnapshot.NotFound" {
klog.Warningf("snapshot %s not found", snapID, err.Error())
klog.Warningf("snapshot %s not found, aws err: %s", snapID, err.Error())
return nil
}
}
Expand Down Expand Up @@ -204,6 +205,68 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string, deleteRatio fl
return nil
}

func (e *EC2Session) DeleteVolumes(volumeIDs []string) error {
volumes, err := e.ListValidVolumes(volumeIDs)
if err != nil {
return errors.Annotate(err, "list volumes error")
}

eg, _ := errgroup.WithContext(context.Background())
workerPool := NewWorkerPool(e.concurrency, "delete volumes")
for _, volume := range volumes {
volumeID := *volume.VolumeId
workerPool.ApplyOnErrorGroup(eg, func() error {
if _, err := e.EC2.DeleteVolume(&ec2.DeleteVolumeInput{VolumeId: aws.String(volumeID)}); err != nil {
if errors.IsNotFound(err) || strings.Contains(err.Error(), "NotFound") {
klog.Warningf(
"volume %s is not found, aws err: %s, skip deleting it", volumeID, err.Error())
return nil
}
return errors.Annotatef(err, "delete volume %s error", volumeID)
}
klog.Infof("volume %s is deleted", volumeID)
return nil
})
}

if err := eg.Wait(); err != nil {
return err
}
return nil
}

func (e *EC2Session) ListValidVolumes(volumeIDs []string) ([]*ec2.Volume, error) {
volumeIDPtrs := make([]*string, len(volumeIDs))
for i, volumeID := range volumeIDs {
volumeIDPtrs[i] = aws.String(volumeID)
}

volumes := make([]*ec2.Volume, 0, len(volumeIDs))
var nextToken *string
for {
volumesOutput, err := e.EC2.DescribeVolumes(&ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("volume-id"),
Values: volumeIDPtrs,
},
},
NextToken: nextToken,
})
if err != nil {
return nil, errors.Annotate(err, "describe volumes error")
}

volumes = append(volumes, volumesOutput.Volumes...)
if volumesOutput.NextToken == nil {
break
}

nextToken = volumesOutput.NextToken
}
return volumes, nil
}

func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error {

eg, _ := errgroup.WithContext(context.Background())
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/restore/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func (c *Controller) updateRestore(cur interface{}) {
return
}

if v1alpha1.IsRestoreVolumeFailed(newRestore) && !v1alpha1.IsCleanVolumeComplete(newRestore) {
// restore volume failed, need to clean created volumes
c.enqueueRestore(newRestore)
return
}

if v1alpha1.IsRestoreFailed(newRestore) {
klog.V(4).Infof("restore %s/%s is Failed, skipping.", ns, name)
return
Expand Down
Loading