Skip to content

Commit

Permalink
Move snapshot delete into local/s3 functions
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Oct 5, 2023
1 parent cc6b50c commit 51ee06d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 74 deletions.
26 changes: 21 additions & 5 deletions pkg/etcd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts)
}

// download downloads the given snapshot from the configured S3
// Download downloads the given snapshot from the configured S3
// compatible backend.
func (s *S3) Download(ctx context.Context) error {
snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)
Expand Down Expand Up @@ -302,12 +302,12 @@ func (s *S3) snapshotRetention(ctx context.Context) error {

for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] {
logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key)
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
return err
}
metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key))
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil {
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil {
if isNotExist(err) {
return nil
}
return err
Expand All @@ -317,13 +317,29 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
return nil
}

func (s *S3) deleteSnapshot(ctx context.Context, key string) error {
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()

key = path.Join(s.config.EtcdS3Folder, key)
err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, key, minio.RemoveObjectOptions{})
if err == nil || isNotExist(err) {
metadataKey := path.Join(path.Dir(key), metadataDir, path.Base(key))
if merr := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !isNotExist(merr) {
err = merr
}
}

return err
}

// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshots := map[string]snapshotFile{}
metadatas := []string{}
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()

opts := minio.ListObjectsOptions{
Expand Down
112 changes: 43 additions & 69 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -516,94 +517,60 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro
return snapshotFiles, err
}

// deleteSnapshots removes the given snapshots from
// either local storage or S3.
// DeleteSnapshots removes the given snapshots from local storage and S3.
func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
snapshotDir, err := snapshotDir(e.config, false)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
}

logrus.Info("Removing the given locally stored etcd snapshot(s)")
logrus.Debugf("Attempting to remove the given locally stored etcd snapshot(s): %v", snapshots)

for _, s := range snapshots {
// check if the given snapshot exists. If it does,
// remove it, otherwise continue.
sf := filepath.Join(snapshotDir, s)
if _, err := os.Stat(sf); os.IsNotExist(err) {
logrus.Infof("Snapshot %s, does not exist", s)
continue
}
if err := os.Remove(sf); err != nil {
return err
}
logrus.Debug("Removed snapshot ", s)
}

if e.config.EtcdS3 {
if e.initS3IfNil(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
if err := e.initS3IfNil(ctx); err != nil {
return err
}
logrus.Info("Removing the given etcd snapshot(s) from S3")
logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots)

objectsCh := make(chan minio.ObjectInfo)

ctx, cancel := context.WithTimeout(ctx, e.config.EtcdS3Timeout)
defer cancel()

go func() {
defer close(objectsCh)
}

opts := minio.ListObjectsOptions{
Recursive: true,
for _, s := range snapshots {
if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil {
if isNotExist(err) {
logrus.Infof("Snapshot %s not found locally", s)
} else {
logrus.Errorf("Failed to delete local snapshot %s: %v", s, err)
}
} else {
logrus.Infof("Snapshot %s deleted locally", s)
}

for obj := range e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, opts) {
if obj.Err != nil {
logrus.Errorf("Failed to list snapshots from S3: %v", obj.Err)
return
}

// iterate through the given snapshots and only
// add them to the channel for remove if they're
// actually found from the bucket listing.
for _, snapshot := range snapshots {
if snapshot == obj.Key {
objectsCh <- obj
}
}
}
}()

err = func() error {
for {
select {
case <-ctx.Done():
logrus.Errorf("Unable to delete snapshot: %v", ctx.Err())
return e.ReconcileSnapshotData(ctx)
case <-time.After(time.Millisecond * 100):
continue
case err, ok := <-e.s3.client.RemoveObjects(ctx, e.config.EtcdS3BucketName, objectsCh, minio.RemoveObjectsOptions{}):
if err.Err != nil {
logrus.Errorf("Unable to delete snapshot: %v", err.Err)
}
if !ok {
return e.ReconcileSnapshotData(ctx)
}
if e.config.EtcdS3 {
if err := e.s3.deleteSnapshot(s); err != nil {
if isNotExist(err) {
logrus.Infof("Snapshot %s not found in S3", s)
} else {
logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err)
}
} else {
logrus.Infof("Snapshot %s deleted from S3", s)
}
}()
if err != nil {
return err
}
}

return e.ReconcileSnapshotData(ctx)
}

func (e *ETCD) deleteSnapshot(snapshotPath string) error {
dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir)
filename := filepath.Base(snapshotPath)
metadataPath := filepath.Join(dir, filename)

err := os.Remove(snapshotPath)
if err == nil || os.IsNotExist(err) {
if merr := os.Remove(metadataPath); err != nil && !isNotExist(err) {
err = merr
}
}

return err
}

func marshalSnapshotFile(sf snapshotFile) ([]byte, error) {
if sf.metadataSource != nil {
if m, err := json.Marshal(sf.metadataSource.Data); err != nil {
Expand Down Expand Up @@ -947,6 +914,13 @@ func isTooLargeError(err error) bool {
return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long"))
}

func isNotExist(err error) bool {
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) {
return true
}
return false
}

// saveSnapshotMetadata writes extra metadata to disk.
// The upload is silently skipped if no extra metadata is provided.
func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error {
Expand Down

0 comments on commit 51ee06d

Please sign in to comment.