From 18c2f290789bb27224134bbf7e91502753eb9abe Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Wed, 30 Mar 2022 01:26:03 +0530 Subject: [PATCH] fix garbage collection logic and add tests Signed-off-by: Sanskar Jaiswal --- controllers/gitrepository_controller_test.go | 148 +++++++++++++++++++ controllers/storage.go | 49 ++++-- controllers/storage_test.go | 105 +++++++++++++ main.go | 16 +- 4 files changed, 302 insertions(+), 16 deletions(-) diff --git a/controllers/gitrepository_controller_test.go b/controllers/gitrepository_controller_test.go index 7b6aeba35..ad3b88d16 100644 --- a/controllers/gitrepository_controller_test.go +++ b/controllers/gitrepository_controller_test.go @@ -1102,6 +1102,154 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { } } +func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { + tests := []struct { + name string + beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string + }{ + { + name: "garbage collects", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + revisions := []string{"a", "b", "c", "d"} + for n := range revisions { + v := revisions[n] + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), + Revision: v, + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + return err + } + if n != len(revisions)-1 { + time.Sleep(time.Second * 1) + } + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), + }, + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", + }, + want: sreconcile.ResultSuccess, + }, + { + name: "notices missing artifact in storage", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "/reconcile-storage/invalid.txt", + Revision: "e", + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + want: sreconcile.ResultSuccess, + assertPaths: []string{ + "!/reconcile-storage/invalid.txt", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "no artifact for resource in storage"), + }, + }, + { + name: "updates hostname on diff from current", + beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80", + URL: "http://outdated.com/reconcile-storage/hostname.txt", + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + return err + } + return nil + }, + want: sreconcile.ResultSuccess, + assertPaths: []string{ + "/reconcile-storage/hostname.txt", + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80", + URL: testStorage.Hostname + "/reconcile-storage/hostname.txt", + Size: int64p(int64(len("file"))), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + defer func() { + for _, p := range tt.assertPaths { + if !strings.HasPrefix(p, "!") { + g.Expect(os.Remove(filepath.Join(testStorage.BasePath, p))).To(Succeed()) + } + } + }() + + r := &GitRepositoryReconciler{ + EventRecorder: record.NewFakeRecorder(32), + Storage: testStorage, + artifactRetentionRecords: 2, + artifactRetentionTTL: 2 * time.Second, + } + + obj := &sourcev1.GitRepository{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, + } + if tt.beforeFunc != nil { + g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) + } + + var c *git.Commit + var as artifactSet + got, err := r.reconcileStorage(context.TODO(), obj, c, &as, "") + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact)) + if tt.assertArtifact != nil && tt.assertArtifact.URL != "" { + g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL)) + } + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + for _, p := range tt.assertPaths { + absoluteP := filepath.Join(testStorage.BasePath, p) + if !strings.HasPrefix(p, "!") { + g.Expect(absoluteP).To(BeAnExistingFile()) + continue + } + g.Expect(absoluteP).NotTo(BeAnExistingFile()) + } + }) + } +} + func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) { g := NewWithT(t) diff --git a/controllers/storage.go b/controllers/storage.go index 6a2e7b748..19df9b702 100644 --- a/controllers/storage.go +++ b/controllers/storage.go @@ -146,7 +146,12 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err return deletedFiles, nil } -func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, n int, ttl time.Duration) []string { +// getGarbageFiles returns all files that need to be garbage collected for the given artifact. +// Garbage files are determined based on the below flow: +// 1. collect all files with an expired ttl +// 2. if we satisfy maxItemsToBeRetained, then return +// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained +func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetained int, ttl time.Duration) []string { localPath := s.LocalPath(artifact) dir := filepath.Dir(localPath) garbageFiles := []string{} @@ -154,20 +159,28 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, n int, ttl time.Du // sortedPaths contain all files sorted according to their created ts. sortedPaths := []string{} now := time.Now().UTC() + totalFiles := 0 _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { createdAt := info.ModTime().UTC() diff := now.Sub(createdAt) // compare the time difference between now and the time at which the file was created // with the provided ttl. delete if difference is greater than the ttl. expired := diff > ttl - if path != localPath && !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink && expired { - garbageFiles = append(garbageFiles, path) - } if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink { + if path != localPath && expired { + garbageFiles = append(garbageFiles, path) + } + totalFiles += 1 filesWithCreatedTs[info.ModTime().UTC()] = path } return nil }) + // We already collected enough garbage files to satisfy the no of max + // items that are supposed to be retained, so exit early. + if totalFiles-len(garbageFiles) < maxItemsToBeRetained { + return garbageFiles + } + creationTimestamps := []time.Time{} for ts := range filesWithCreatedTs { creationTimestamps = append(creationTimestamps, ts) @@ -182,11 +195,25 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, n int, ttl time.Du sortedPaths = append(sortedPaths, path) } - for i, path := range sortedPaths { - if path != localPath && !stringInSlice(path, garbageFiles) && len(sortedPaths) <= n { - // append path to garbageFiles and remove it from sortedPaths - garbageFiles = append(garbageFiles, path) - sortedPaths = append(sortedPaths[:i], sortedPaths[i+1:]...) + var collected int + noOfGarbageFiles := len(garbageFiles) + for _, path := range sortedPaths { + if path != localPath && !stringInSlice(path, garbageFiles) { + // If we previously collected a few garbage files with an expired ttl, then take that into account + // when checking whether we need to remove more files to satisfy the max no of items allowed + // in the filesystem, along with the no of files already removed in this loop. + if noOfGarbageFiles > 0 { + if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained { + // append path to garbageFiles and remove it from sortedPaths + garbageFiles = append(garbageFiles, path) + collected += 1 + } + } else { + if len(sortedPaths)-collected > maxItemsToBeRetained { + garbageFiles = append(garbageFiles, path) + collected += 1 + } + } } } @@ -195,8 +222,8 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, n int, ttl time.Du // RemoveGarbageFiles removes all garabge files in the artifact dir according to the provided // retention options. -func (s *Storage) RemoveGarbageFiles(artifact sourcev1.Artifact, n int, ttl time.Duration) ([]string, error) { - garbageFiles := s.getGarbageFiles(artifact, n, ttl) +func (s *Storage) RemoveGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) { + garbageFiles := s.getGarbageFiles(artifact, maxItemsToBeRetained, ttl) var errors []string var deleted []string if len(garbageFiles) > 0 { diff --git a/controllers/storage_test.go b/controllers/storage_test.go index 7da575c64..ea94feca3 100644 --- a/controllers/storage_test.go +++ b/controllers/storage_test.go @@ -24,6 +24,7 @@ import ( "os" "path" "path/filepath" + "strings" "testing" "time" @@ -486,3 +487,107 @@ func TestStorageCopyFromPath(t *testing.T) { }) } } + +func TestStorage_getGarbageFiles(t *testing.T) { + artifactFolder := path.Join("foo", "bar") + tests := []struct { + name string + artifactPaths []string + createPause time.Duration + ttl time.Duration + maxItemsToBeRetained int + wantDeleted []string + }{ + { + name: "delete files based on maxItemsToBeRetained", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + }, + createPause: time.Nanosecond, + ttl: time.Minute * 2, + maxItemsToBeRetained: 2, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + }, + }, + { + name: "delete files based on ttl", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + }, + createPause: time.Second * 1, + ttl: time.Second*3 + time.Millisecond*500, + maxItemsToBeRetained: 4, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + }, + { + name: "delete files based on ttl and maxItemsToBeRetained", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + path.Join(artifactFolder, "artifact5.tar.gz"), + path.Join(artifactFolder, "artifact6.tar.gz"), + }, + createPause: time.Second * 1, + ttl: time.Second*5 + time.Millisecond*500, + maxItemsToBeRetained: 4, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + dir, err := os.MkdirTemp("", "") + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, err := NewStorage(dir, "hostname", time.Minute) + g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") + + artifact := sourcev1.Artifact{ + Path: tt.artifactPaths[len(tt.artifactPaths)-1], + } + g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0755)).ToNot(HaveOccurred()) + for _, artifactPath := range tt.artifactPaths { + f, err := os.Create(path.Join(dir, artifactPath)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(f.Close()).ToNot(HaveOccurred()) + time.Sleep(tt.createPause) + } + + deletedPaths := s.getGarbageFiles(artifact, tt.maxItemsToBeRetained, tt.ttl) + g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) + for _, wantDeletedPath := range tt.wantDeleted { + present := false + for _, deletedPath := range deletedPaths { + if strings.Contains(deletedPath, wantDeletedPath) { + present = true + break + } + } + if present == false { + g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) + } + } + }) + } +} diff --git a/main.go b/main.go index e9ce1dae7..081b95fdf 100644 --- a/main.go +++ b/main.go @@ -113,9 +113,9 @@ func main() { flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.") flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 30*time.Second, - "The duration for which artifacts be persisted in storage before being evicted.") + "The duration of time that artifacts will be kept in storage before being garbage collected.") flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2, - "The number of artifacts allowed to be present in storage.") + "The maximum number of artifacts to be kept in storage after a garbage collection.") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) @@ -194,7 +194,9 @@ func main() { Getters: getters, ControllerName: controllerName, }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ - MaxConcurrentReconciles: concurrent, + MaxConcurrentReconciles: concurrent, + ArtifactRetentionTTL: artifactRetentionTTL, + ArtifactRetentionRecords: artifactRetentionRecords, }); err != nil { setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind) os.Exit(1) @@ -207,7 +209,9 @@ func main() { Metrics: metricsH, ControllerName: controllerName, }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ - MaxConcurrentReconciles: concurrent, + MaxConcurrentReconciles: concurrent, + ArtifactRetentionTTL: artifactRetentionTTL, + ArtifactRetentionRecords: artifactRetentionRecords, }); err != nil { setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmChartKind) os.Exit(1) @@ -219,7 +223,9 @@ func main() { Storage: storage, ControllerName: controllerName, }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ - MaxConcurrentReconciles: concurrent, + MaxConcurrentReconciles: concurrent, + ArtifactRetentionTTL: artifactRetentionTTL, + ArtifactRetentionRecords: artifactRetentionRecords, }); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Bucket") os.Exit(1)