Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rel-v0.24] Enhancement: Implement Chunk Deletion for Multi-part Uploaded Files on GCP and Openstack Cloud Providers #682

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions pkg/snapshot/snapshotter/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

// RunGarbageCollector basically consider the older backups as garbage and deletes it
Expand Down Expand Up @@ -59,6 +58,11 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
continue
}

// chunksDeleted stores the no of chunks deleted in the current iteration of GC
var chunksDeleted int
chunksDeleted, snapList = ssr.GarbageCollectChunks(snapList)
ssr.logger.Infof("GC: Total number garbage collected chunks: %d", chunksDeleted)

snapStreamIndexList := getSnapStreamIndexList(snapList)

switch ssr.config.GarbageCollectionPolicy {
Expand Down Expand Up @@ -142,7 +146,6 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
}
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
total++
garbageCollectChunks(ssr.store, snapList, snapStreamIndexList[snapStreamIndex-1]+1, snapStreamIndexList[snapStreamIndex])
}
}

Expand All @@ -167,7 +170,6 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
}
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
total++
garbageCollectChunks(ssr.store, snapList, snapStreamIndexList[snapStreamIndex]+1, snapStreamIndexList[snapStreamIndex+1])
}
}
}
Expand All @@ -193,24 +195,35 @@ func getSnapStreamIndexList(snapList brtypes.SnapList) []int {
return snapStreamIndexList
}

// garbageCollectChunks deletes the chunks in the store from snaplist starting at index low (inclusive) till high (exclusive).
func garbageCollectChunks(store brtypes.SnapStore, snapList brtypes.SnapList, low, high int) {
for index := low; index < high; index++ {
snap := snapList[index]
// Only delete chunk snapshots of kind Full
if snap.Kind != brtypes.SnapshotKindFull || !snap.IsChunk {
// GarbageCollectChunks removes obsolete chunks based on the latest recorded snapshot.
// It eliminates chunks associated with snapshots that have already been uploaded.
// Additionally, it avoids deleting chunks linked to snapshots currently being uploaded to prevent the garbage collector from removing chunks before the composite is formed.
func (ssr *Snapshotter) GarbageCollectChunks(snapList brtypes.SnapList) (int, brtypes.SnapList) {
var nonChunkSnapList brtypes.SnapList
chunksDeleted := 0
for _, snap := range snapList {
// If not chunk, add to list and continue
if !snap.IsChunk {
nonChunkSnapList = append(nonChunkSnapList, snap)
continue
}
// Skip the chunk deletion if it's corresponding full/delta snapshot is not uploaded yet
if ssr.prevSnapshot.LastRevision == 0 || snap.StartRevision > ssr.prevSnapshot.LastRevision {
continue
}
// delete the chunk object
snapPath := path.Join(snap.SnapDir, snap.SnapName)
logrus.Infof("GC: Deleting chunk for old full snapshot: %s", snapPath)
if err := store.Delete(*snap); err != nil {
logrus.Warnf("GC: Failed to delete snapshot %s: %v", snapPath, err)
ssr.logger.Infof("GC: Deleting chunk for old snapshot: %s", snapPath)
if err := ssr.store.Delete(*snap); err != nil {
ssr.logger.Warnf("GC: Failed to delete chunk %s: %v", snapPath, err)
metrics.SnapshotterOperationFailure.With(prometheus.Labels{metrics.LabelError: err.Error()}).Inc()
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindChunk, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Inc()
continue
}
chunksDeleted++
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindChunk, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
}
return chunksDeleted, nonChunkSnapList
}

/*
Expand Down
7 changes: 7 additions & 0 deletions pkg/snapstore/gcs_snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (m *mockBucketHandle) Object(name string) stiface.ObjectHandle {
}

func (m *mockBucketHandle) Objects(context.Context, *storage.Query) stiface.ObjectIterator {
m.client.objectMutex.Lock()
defer m.client.objectMutex.Unlock()
var keys []string
for key := range m.client.objects {
keys = append(keys, key)
Expand All @@ -65,6 +67,8 @@ type mockObjectHandle struct {
}

func (m *mockObjectHandle) NewReader(ctx context.Context) (stiface.Reader, error) {
m.client.objectMutex.Lock()
defer m.client.objectMutex.Unlock()
if value, ok := m.client.objects[m.object]; ok {
return &mockObjectReader{reader: io.NopCloser(bytes.NewReader(*value))}, nil
}
Expand All @@ -84,6 +88,8 @@ func (m *mockObjectHandle) ComposerFrom(objects ...stiface.ObjectHandle) stiface
}

func (m *mockObjectHandle) Delete(context.Context) error {
m.client.objectMutex.Lock()
defer m.client.objectMutex.Unlock()
if _, ok := m.client.objects[m.object]; ok {
delete(m.client.objects, m.object)
return nil
Expand Down Expand Up @@ -117,6 +123,7 @@ type mockComposer struct {

func (m *mockComposer) Run(ctx context.Context) (*storage.ObjectAttrs, error) {
dstWriter := m.dst.NewWriter(ctx)
defer dstWriter.Close()
for _, obj := range m.objectHandles {
r, err := obj.NewReader(ctx)
if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions pkg/snapstore/snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
objectMap[path.Join(prefixV1, snap2.SnapDir, snap2.SnapName)] = &expectedVal2

logrus.Infof("Running mock tests for %s when only v1 is present", key)
// List snap1 and snap3
// List snap1 and snap2
snapList, err := snapStore.List()
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(2))
Expect(snapList[0].SnapName).To(Equal(snap2.SnapName))
// Fetch snap3
// Fetch snap2
rc, err := snapStore.Fetch(*snapList[1])
Expect(err).ShouldNot(HaveOccurred())
defer rc.Close()
Expand All @@ -164,12 +164,13 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
snapList, err = snapStore.List()
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(prevLen - 1))
// Save snapshot
// reset the objectMap
resetObjectMap()
dummyData := make([]byte, 6*1024*1024)
// Save a new snapshot 'snap3'
err = snapStore.Save(snap3, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", 0))
Expect(len(objectMap)).Should(BeNumerically(">=", 1))
}
})
})
Expand Down Expand Up @@ -217,19 +218,20 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
err = snapStore.Delete(*snapList[2])
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).To(Equal(prevLen - 1))

// Save snapshot
// reset the objectMap
resetObjectMap()
// Save a new snapshot 'snap1'
dummyData := make([]byte, 6*1024*1024)
err = snapStore.Save(snap1, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", 0))
Expect(len(objectMap)).Should(BeNumerically(">=", 1))

// Save another new snapshot 'snap4'
prevLen = len(objectMap)
dummyData = make([]byte, 6*1024*1024)
err = snapStore.Save(snap4, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", prevLen))
Expect(len(objectMap)).Should(BeNumerically(">=", prevLen+1))
}
})
})
Expand All @@ -248,6 +250,7 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(2))
Expect(snapList[0].SnapName).To(Equal(snap4.SnapName))
Expect(snapList[1].SnapName).To(Equal(snap5.SnapName))
// Fetch snap5
rc, err := snapStore.Fetch(*snapList[1])
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -263,12 +266,13 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
snapList, err = snapStore.List()
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(prevLen - 1))
// Save snapshot
// Reset the objectMap
resetObjectMap()
// Save a new snapshot 'snap4'
dummyData := make([]byte, 6*1024*1024)
err = snapStore.Save(snap4, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", 0))
Expect(len(objectMap)).Should(BeNumerically(">=", 1))
}
})
})
Expand Down
9 changes: 8 additions & 1 deletion pkg/snapstore/swift_snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func handleCreateTextObject(w http.ResponseWriter, r *http.Request) {
// handleDownloadObject creates an HTTP handler at `/testContainer/testObject` on the test handler mux that
// responds with a `Download` response.
func handleDownloadObject(w http.ResponseWriter, r *http.Request) {
objectMapMutex.Lock()
defer objectMapMutex.Unlock()

prefix := parseObjectNamefromURL(r.URL)
var contents []byte
for key, val := range objectMap {
Expand All @@ -109,8 +112,10 @@ func handleDownloadObject(w http.ResponseWriter, r *http.Request) {
// handleListObjectNames creates an HTTP handler at `/testContainer` on the test handler mux that
// responds with a `List` response when only object names are requested.
func handleListObjectNames(w http.ResponseWriter, r *http.Request) {
marker := r.URL.Query().Get("marker")
objectMapMutex.Lock()
defer objectMapMutex.Unlock()

marker := r.URL.Query().Get("marker")
// To store the keys in slice in sorted order
var keys, contents []string
for k := range objectMap {
Expand All @@ -132,6 +137,8 @@ func handleListObjectNames(w http.ResponseWriter, r *http.Request) {
// handleDeleteObject creates an HTTP handler at `/testContainer/testObject` on the test handler mux that
// responds with a `Delete` response.
func handleDeleteObject(w http.ResponseWriter, r *http.Request) {
objectMapMutex.Lock()
defer objectMapMutex.Unlock()
key := parseObjectNamefromURL(r.URL)
if _, ok := objectMap[key]; ok {
delete(objectMap, key)
Expand Down