diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 2809a64e1..bb98016de 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -371,9 +371,6 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, // they match the Storage server hostname of current runtime. func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage - // Abort if it takes more than 5 seconds. - ctx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage @@ -635,30 +632,20 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc return nil } if obj.GetArtifact() != nil { - delFilesChan := make(chan []string) - errChan := make(chan error) - go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan) - for { - select { - case <-ctx.Done(): - err := context.DeadlineExceeded - r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", - fmt.Sprintf("garbage collection of old artifacts failed: %s", err)) - return err - case delFiles := <-delFilesChan: - if len(delFiles) > 0 { - r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) - } - return nil - case err := <-errChan: - e := &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), - Reason: "GarbageCollectionFailed", - } - r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) - return e + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + Reason: "GarbageCollectionFailed", } + r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", + fmt.Sprintf("garbage collection of old artifacts failed: %s", e)) + return e + } + if len(delFiles) > 0 { + r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", + fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) + return nil } } diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index c8ecec5f7..07014e547 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -163,13 +163,15 @@ func TestBucketReconciler_Reconcile(t *testing.T) { }, timeout).Should(BeTrue()) } -func TestBucketReconciler_garbageCollect(t *testing.T) { +func TestBucketReconciler_reconcileStorage(t *testing.T) { tests := []struct { - name string - beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error - wantErr string - assertPaths []string - ctxTimeout time.Duration + name string + beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string }{ { name: "garbage collects", @@ -178,7 +180,7 @@ func TestBucketReconciler_garbageCollect(t *testing.T) { for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), Revision: v, } if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { @@ -194,99 +196,21 @@ func TestBucketReconciler_garbageCollect(t *testing.T) { testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, - wantErr: "", - assertPaths: []string{ - "/garbage-collect/d.txt", - "/garbage-collect/c.txt", - "!/garbage-collect/b.txt", - "!/garbage-collect/a.txt", + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, - }, - { - name: "garbage collection fails with context timeout", - beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { - revisions := []string{"a", "b", "c", "d"} - for n := range revisions { - v := revisions[n] - obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), - Revision: v, - } - if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { - return err - } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { - return err - } - } - testStorage.SetArtifactURL(obj.Status.Artifact) - return nil + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", }, - ctxTimeout: time.Second * 1, - wantErr: "context deadline exceeded", - assertPaths: []string{}, + want: sreconcile.ResultSuccess, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - - defer func() { - g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed()) - }() - - r := &BucketReconciler{ - EventRecorder: record.NewFakeRecorder(32), - Storage: testStorage, - } - - obj := &sourcev1.Bucket{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "test-", - }, - } - if tt.beforeFunc != nil { - g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) - } - ctx := context.TODO() - var cancel context.CancelFunc - if tt.ctxTimeout > 0 { - ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout) - defer cancel() - time.Sleep(tt.ctxTimeout * 2) - } - err := r.garbageCollect(ctx, obj) - if tt.wantErr != "" { - g.Expect(err).ToNot(BeNil()) - g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) - } else { - g.Expect(err).To(BeNil()) - } - - for _, p := range tt.assertPaths { - absoluteP := filepath.Join(testStorage.BasePath, p) - if !strings.HasPrefix(p, "!") { - g.Expect(absoluteP).To(BeAnExistingFile()) - continue - } - g.Expect(absoluteP).NotTo(BeAnExistingFile()) - } - }) - } - -} - -func TestBucketReconciler_reconcileStorage(t *testing.T) { - tests := []struct { - name string - beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error - want sreconcile.Result - wantErr bool - assertArtifact *sourcev1.Artifact - assertConditions []metav1.Condition - assertPaths []string - }{ { name: "notices missing artifact in storage", beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index 2d6e6b045..c104849a7 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -275,9 +275,6 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.GitRepository, _ *git.Commit, _ *artifactSet, _ string) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage - // Abort if it takes more than 5 seconds. - ctx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage @@ -711,30 +708,20 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc return nil } if obj.GetArtifact() != nil { - delFilesChan := make(chan []string) - errChan := make(chan error) - go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan) - for { - select { - case <-ctx.Done(): - err := context.DeadlineExceeded - r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", - fmt.Sprintf("garbage collection of old artifacts failed: %s", err)) - return err - case delFiles := <-delFilesChan: - if len(delFiles) > 0 { - r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) - } - return nil - case err := <-errChan: - e := &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), - Reason: "GarbageCollectionFailed", - } - r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) - return e + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + Reason: "GarbageCollectionFailed", } + r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", + fmt.Sprintf("garbage collection of old artifacts failed: %s", e)) + return e + } + if len(delFiles) > 0 { + r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", + fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/gitrepository_controller_test.go b/controllers/gitrepository_controller_test.go index 186baa2bd..b99a9e6bd 100644 --- a/controllers/gitrepository_controller_test.go +++ b/controllers/gitrepository_controller_test.go @@ -1104,13 +1104,15 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) { } } -func TestGitRepositoryReconciler_garbageCollect(t *testing.T) { +func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { tests := []struct { - name string - beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error - wantErr string - assertPaths []string - ctxTimeout time.Duration + name string + beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string }{ { name: "garbage collects", @@ -1119,7 +1121,7 @@ func TestGitRepositoryReconciler_garbageCollect(t *testing.T) { for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), Revision: v, } if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { @@ -1135,99 +1137,21 @@ func TestGitRepositoryReconciler_garbageCollect(t *testing.T) { testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, - wantErr: "", - assertPaths: []string{ - "/garbage-collect/d.txt", - "/garbage-collect/c.txt", - "!/garbage-collect/b.txt", - "!/garbage-collect/a.txt", + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, - }, - { - name: "garbage collection fails with context timeout", - beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { - revisions := []string{"a", "b", "c", "d"} - for n := range revisions { - v := revisions[n] - obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), - Revision: v, - } - if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { - return err - } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { - return err - } - } - testStorage.SetArtifactURL(obj.Status.Artifact) - return nil + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", }, - ctxTimeout: time.Second * 1, - wantErr: "context deadline exceeded", - assertPaths: []string{}, + want: sreconcile.ResultSuccess, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - - defer func() { - g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed()) - }() - - r := &GitRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), - Storage: testStorage, - } - - obj := &sourcev1.GitRepository{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "test-", - }, - } - if tt.beforeFunc != nil { - g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) - } - ctx := context.TODO() - var cancel context.CancelFunc - if tt.ctxTimeout > 0 { - ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout) - defer cancel() - time.Sleep(tt.ctxTimeout * 2) - } - err := r.garbageCollect(ctx, obj) - if tt.wantErr != "" { - g.Expect(err).ToNot(BeNil()) - g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) - } else { - g.Expect(err).To(BeNil()) - } - - for _, p := range tt.assertPaths { - absoluteP := filepath.Join(testStorage.BasePath, p) - if !strings.HasPrefix(p, "!") { - g.Expect(absoluteP).To(BeAnExistingFile()) - continue - } - g.Expect(absoluteP).NotTo(BeAnExistingFile()) - } - }) - } - -} - -func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) { - tests := []struct { - name string - beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error - want sreconcile.Result - wantErr bool - assertArtifact *sourcev1.Artifact - assertConditions []metav1.Condition - assertPaths []string - }{ { name: "notices missing artifact in storage", beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error { diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index 7ed0d60b5..e0205e113 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -804,30 +804,20 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1. return nil } if obj.GetArtifact() != nil { - delFilesChan := make(chan []string) - errChan := make(chan error) - go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan) - for { - select { - case <-ctx.Done(): - err := context.DeadlineExceeded - r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", - fmt.Sprintf("garbage collection of old artifacts failed: %s", err)) - return err - case delFiles := <-delFilesChan: - if len(delFiles) > 0 { - r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) - } - return nil - case err := <-errChan: - e := &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), - Reason: "GarbageCollectionFailed", - } - r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) - return e + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + Reason: "GarbageCollectionFailed", } + r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", + fmt.Sprintf("garbage collection of old artifacts failed: %s", e)) + return e + } + if len(delFiles) > 0 { + r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", + fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) + return nil } } return nil diff --git a/controllers/helmchart_controller_test.go b/controllers/helmchart_controller_test.go index 5d82bbab1..fdce5de6e 100644 --- a/controllers/helmchart_controller_test.go +++ b/controllers/helmchart_controller_test.go @@ -164,13 +164,15 @@ func TestHelmChartReconciler_Reconcile(t *testing.T) { }, timeout).Should(BeTrue()) } -func TestHelmChartReconciler_garbageCollect(t *testing.T) { +func TestHelmChartReconciler_reconcileStorage(t *testing.T) { tests := []struct { - name string - beforeFunc func(obj *sourcev1.HelmChart, storage *Storage) error - wantErr string - assertPaths []string - ctxTimeout time.Duration + name string + beforeFunc func(obj *sourcev1.HelmChart, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string }{ { name: "garbage collects", @@ -179,7 +181,7 @@ func TestHelmChartReconciler_garbageCollect(t *testing.T) { for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), Revision: v, } if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { @@ -195,99 +197,21 @@ func TestHelmChartReconciler_garbageCollect(t *testing.T) { testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, - wantErr: "", - assertPaths: []string{ - "/garbage-collect/d.txt", - "/garbage-collect/c.txt", - "!/garbage-collect/b.txt", - "!/garbage-collect/a.txt", + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, - }, - { - name: "garbage collection fails with context timeout", - beforeFunc: func(obj *sourcev1.HelmChart, storage *Storage) error { - revisions := []string{"a", "b", "c", "d"} - for n := range revisions { - v := revisions[n] - obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), - Revision: v, - } - if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { - return err - } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { - return err - } - } - testStorage.SetArtifactURL(obj.Status.Artifact) - return nil + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", }, - ctxTimeout: time.Second * 1, - wantErr: "context deadline exceeded", - assertPaths: []string{}, + want: sreconcile.ResultSuccess, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - - defer func() { - g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed()) - }() - - r := &HelmChartReconciler{ - EventRecorder: record.NewFakeRecorder(32), - Storage: testStorage, - } - - obj := &sourcev1.HelmChart{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "test-", - }, - } - if tt.beforeFunc != nil { - g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) - } - ctx := context.TODO() - var cancel context.CancelFunc - if tt.ctxTimeout > 0 { - ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout) - defer cancel() - time.Sleep(tt.ctxTimeout * 2) - } - err := r.garbageCollect(ctx, obj) - if tt.wantErr != "" { - g.Expect(err).ToNot(BeNil()) - g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) - } else { - g.Expect(err).To(BeNil()) - } - - for _, p := range tt.assertPaths { - absoluteP := filepath.Join(testStorage.BasePath, p) - if !strings.HasPrefix(p, "!") { - g.Expect(absoluteP).To(BeAnExistingFile()) - continue - } - g.Expect(absoluteP).NotTo(BeAnExistingFile()) - } - }) - } - -} - -func TestHelmChartReconciler_reconcileStorage(t *testing.T) { - tests := []struct { - name string - beforeFunc func(obj *sourcev1.HelmChart, storage *Storage) error - want sreconcile.Result - wantErr bool - assertArtifact *sourcev1.Artifact - assertConditions []metav1.Condition - assertPaths []string - }{ { name: "notices missing artifact in storage", beforeFunc: func(obj *sourcev1.HelmChart, storage *Storage) error { diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 410fd28ea..49d7d4cf2 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -518,31 +518,22 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour return nil } if obj.GetArtifact() != nil { - delFilesChan := make(chan []string) - errChan := make(chan error) - go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan) - for { - select { - case <-ctx.Done(): - err := context.DeadlineExceeded - r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", - fmt.Sprintf("garbage collection of old artifacts failed: %s", err)) - return err - case delFiles := <-delFilesChan: - if len(delFiles) > 0 { - r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", - fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) - } - return nil - case err := <-errChan: - e := &serror.Event{ - Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), - Reason: "GarbageCollectionFailed", - } - r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error()) - return e + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err), + Reason: "GarbageCollectionFailed", } + r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed", + fmt.Sprintf("garbage collection of old artifacts failed: %s", e)) + return e } + if len(delFiles) > 0 { + r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", + fmt.Sprintf("garbage collected %d old artifacts", len(delFiles))) + return nil + } + } return nil } diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index 6a162fc1d..7509766a3 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -134,13 +134,15 @@ func TestHelmRepositoryReconciler_Reconcile(t *testing.T) { }, timeout).Should(BeTrue()) } -func TestHelmRepositoryReconciler_garbageCollect(t *testing.T) { +func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { tests := []struct { - name string - beforeFunc func(obj *sourcev1.HelmRepository, storage *Storage) error - wantErr string - assertPaths []string - ctxTimeout time.Duration + name string + beforeFunc func(obj *sourcev1.HelmRepository, storage *Storage) error + want sreconcile.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string }{ { name: "garbage collects", @@ -149,7 +151,7 @@ func TestHelmRepositoryReconciler_garbageCollect(t *testing.T) { for n := range revisions { v := revisions[n] obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), Revision: v, } if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { @@ -165,99 +167,21 @@ func TestHelmRepositoryReconciler_garbageCollect(t *testing.T) { testStorage.SetArtifactURL(obj.Status.Artifact) return nil }, - wantErr: "", - assertPaths: []string{ - "/garbage-collect/d.txt", - "/garbage-collect/c.txt", - "!/garbage-collect/b.txt", - "!/garbage-collect/a.txt", + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/d.txt", + Revision: "d", + Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4", + URL: testStorage.Hostname + "/reconcile-storage/d.txt", + Size: int64p(int64(len("d"))), }, - }, - { - name: "garbage collection fails with context timeout", - beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error { - revisions := []string{"a", "b", "c", "d"} - for n := range revisions { - v := revisions[n] - obj.Status.Artifact = &sourcev1.Artifact{ - Path: fmt.Sprintf("/garbage-collect/%s.txt", v), - Revision: v, - } - if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { - return err - } - if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { - return err - } - } - testStorage.SetArtifactURL(obj.Status.Artifact) - return nil + assertPaths: []string{ + "/reconcile-storage/d.txt", + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", }, - ctxTimeout: time.Second * 1, - wantErr: "context deadline exceeded", - assertPaths: []string{}, + want: sreconcile.ResultSuccess, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := NewWithT(t) - - defer func() { - g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed()) - }() - - r := &HelmRepositoryReconciler{ - EventRecorder: record.NewFakeRecorder(32), - Storage: testStorage, - } - - obj := &sourcev1.HelmRepository{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "test-", - }, - } - if tt.beforeFunc != nil { - g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) - } - ctx := context.TODO() - var cancel context.CancelFunc - if tt.ctxTimeout > 0 { - ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout) - defer cancel() - time.Sleep(tt.ctxTimeout * 2) - } - err := r.garbageCollect(ctx, obj) - if tt.wantErr != "" { - g.Expect(err).ToNot(BeNil()) - g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) - } else { - g.Expect(err).To(BeNil()) - } - - for _, p := range tt.assertPaths { - absoluteP := filepath.Join(testStorage.BasePath, p) - if !strings.HasPrefix(p, "!") { - g.Expect(absoluteP).To(BeAnExistingFile()) - continue - } - g.Expect(absoluteP).NotTo(BeAnExistingFile()) - } - }) - } - -} - -func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) { - tests := []struct { - name string - beforeFunc func(obj *sourcev1.HelmRepository, storage *Storage) error - want sreconcile.Result - wantErr bool - assertArtifact *sourcev1.Artifact - assertConditions []metav1.Condition - assertPaths []string - }{ { name: "notices missing artifact in storage", beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error { diff --git a/controllers/storage.go b/controllers/storage.go index be0f057ea..d9358a2b1 100644 --- a/controllers/storage.go +++ b/controllers/storage.go @@ -19,6 +19,7 @@ package controllers import ( "archive/tar" "compress/gzip" + "context" "crypto/sha256" "fmt" "hash" @@ -207,7 +208,7 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ",")) } - // We already collected enough garbage files to satisfy the no of max + // We already collected enough garbage files to satisfy the no. of max // items that are supposed to be retained, so exit early. if totalFiles-len(garbageFiles) < maxItemsToBeRetained { return garbageFiles, nil @@ -228,11 +229,10 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m for _, path := range sortedPaths { if path != localPath && !stringInSlice(path, garbageFiles) { // If we previously collected a few garbage files with an expired ttl, then take that into account - // when checking whether we need to remove more files to satisfy the max no of items allowed - // in the filesystem, along with the no of files already removed in this loop. + // when checking whether we need to remove more files to satisfy the max no. of items allowed + // in the filesystem, along with the no. of files already removed in this loop. if noOfGarbageFiles > 0 { if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained { - // append path to garbageFiles and remove it from sortedPaths garbageFiles = append(garbageFiles, path) collected += 1 } @@ -250,29 +250,48 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, m // GarbageCollect removes all garabge files in the artifact dir according to the provided // retention options. -func (s *Storage) GarbageCollect(artifact sourcev1.Artifact, delFilesChan chan<- []string, errorChan chan<- error) { - garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL) - if err != nil { - errorChan <- err - return - } - var errors []error - var deleted []string - if len(garbageFiles) > 0 { - for _, file := range garbageFiles { - err := os.Remove(file) - if err != nil { - errors = append(errors, err) - } else { - deleted = append(deleted, file) +func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact, timeout time.Duration) ([]string, error) { + delFilesChan := make(chan []string) + errChan := make(chan error) + // Abort if it takes more than the provided timeout duration. + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + go func() { + garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL) + if err != nil { + errChan <- err + return + } + var errors []error + var deleted []string + if len(garbageFiles) > 0 { + for _, file := range garbageFiles { + err := os.Remove(file) + if err != nil { + errors = append(errors, err) + } else { + deleted = append(deleted, file) + } } } + if len(errors) > 0 { + errChan <- kerrors.NewAggregate(errors) + return + } + delFilesChan <- deleted + }() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case delFiles := <-delFilesChan: + return delFiles, nil + case err := <-errChan: + return nil, err + } } - if len(errors) > 0 { - errorChan <- kerrors.NewAggregate(errors) - return - } - delFilesChan <- deleted } func stringInSlice(a string, list []string) bool { diff --git a/controllers/storage_test.go b/controllers/storage_test.go index 9af7ccf46..08ab66f1e 100644 --- a/controllers/storage_test.go +++ b/controllers/storage_test.go @@ -19,6 +19,7 @@ package controllers import ( "archive/tar" "compress/gzip" + "context" "fmt" "io" "os" @@ -616,3 +617,89 @@ func TestStorage_getGarbageFiles(t *testing.T) { }) } } + +func TestStorage_GarbageCollect(t *testing.T) { + artifactFolder := path.Join("foo", "bar") + tests := []struct { + name string + artifactPaths []string + wantDeleted []string + wantErr string + ctxTimeout time.Duration + }{ + { + name: "garbage collects", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + }, + wantDeleted: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + }, + ctxTimeout: time.Second * 1, + }, + { + name: "garbage collection fails with context timeout", + artifactPaths: []string{ + path.Join(artifactFolder, "artifact1.tar.gz"), + path.Join(artifactFolder, "artifact2.tar.gz"), + path.Join(artifactFolder, "artifact3.tar.gz"), + path.Join(artifactFolder, "artifact4.tar.gz"), + }, + wantErr: "context deadline exceeded", + ctxTimeout: time.Nanosecond * 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + dir, err := os.MkdirTemp("", "") + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, err := NewStorage(dir, "hostname", time.Second*2, 2) + g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage") + + artifact := sourcev1.Artifact{ + Path: tt.artifactPaths[len(tt.artifactPaths)-1], + } + g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred()) + for i, artifactPath := range tt.artifactPaths { + f, err := os.Create(path.Join(dir, artifactPath)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(f.Close()).ToNot(HaveOccurred()) + if i != len(tt.artifactPaths)-1 { + time.Sleep(time.Second * 1) + } + } + + deletedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout) + if tt.wantErr == "" { + g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files") + } else { + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring(tt.wantErr)) + } + if len(tt.wantDeleted) > 0 { + g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths))) + for _, wantDeletedPath := range tt.wantDeleted { + present := false + for _, deletedPath := range deletedPaths { + if strings.Contains(deletedPath, wantDeletedPath) { + g.Expect(deletedPath).ToNot(BeAnExistingFile()) + present = true + break + } + } + if present == false { + g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath)) + } + } + } + }) + } +}