Skip to content

Commit

Permalink
modify gc for all other reconcilers
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Apr 7, 2022
1 parent 6d691f6 commit d8b1029
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 109 deletions.
34 changes: 27 additions & 7 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
// they match the Storage server hostname of current runtime.
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)

// Determine if the advertised artifact is still in storage
Expand Down Expand Up @@ -632,16 +635,33 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %s", err),
Reason: "GarbageCollectionFailed",
delFilesChan := make(chan []string)
errChan := make(chan error)
go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan)
for {
select {
case <-ctx.Done():
err := context.DeadlineExceeded
r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
fmt.Sprintf("garbage collection of old artifacts failed: %s", err))
return err
case delFiles := <-delFilesChan:
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d old artifacts", len(delFiles)))
}
return nil
case err := <-errChan:
e := &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
}
}

return nil
}

Expand Down
126 changes: 105 additions & 21 deletions controllers/bucket_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,22 @@ func TestBucketReconciler_Reconcile(t *testing.T) {
}, timeout).Should(BeTrue())
}

func TestBucketReconciler_reconcileStorage(t *testing.T) {
func TestBucketReconciler_garbageCollect(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
name string
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
wantErr string
assertPaths []string
ctxTimeout time.Duration
}{
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Path: fmt.Sprintf("/garbage-collect/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
Expand All @@ -189,24 +187,106 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: sreconcile.ResultSuccess,
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
},
wantErr: "",
assertPaths: []string{
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
"/garbage-collect/d.txt",
"/garbage-collect/c.txt",
"!/garbage-collect/b.txt",
"!/garbage-collect/a.txt",
},
},
{
name: "garbage collection fails with context timeout",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/garbage-collect/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
return err
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
ctxTimeout: time.Second * 1,
wantErr: "context deadline exceeded",
assertPaths: []string{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed())
}()

r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}

obj := &sourcev1.Bucket{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
}
if tt.beforeFunc != nil {
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
ctx := context.TODO()
var cancel context.CancelFunc
if tt.ctxTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout)
defer cancel()
time.Sleep(tt.ctxTimeout * 2)
}
err := r.garbageCollect(ctx, obj)
if tt.wantErr != "" {
g.Expect(err).ToNot(BeNil())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
} else {
g.Expect(err).To(BeNil())
}

for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
})
}

}

func TestBucketReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "notices missing artifact in storage",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
Expand Down Expand Up @@ -259,6 +339,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()

r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
Expand Down
12 changes: 3 additions & 9 deletions controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,12 @@ type GitRepositoryReconciler struct {
Storage *Storage
ControllerName string

requeueDependency time.Duration
artifactRetentionTTL time.Duration
artifactRetentionRecords int
requeueDependency time.Duration
}

type GitRepositoryReconcilerOptions struct {
MaxConcurrentReconciles int
DependencyRequeueInterval time.Duration
ArtifactRetentionTTL time.Duration
ArtifactRetentionRecords int
}

// gitRepositoryReconcileFunc is the function type for all the
Expand All @@ -127,8 +123,6 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {

func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts GitRepositoryReconcilerOptions) error {
r.requeueDependency = opts.DependencyRequeueInterval
r.artifactRetentionRecords = opts.ArtifactRetentionRecords
r.artifactRetentionTTL = opts.ArtifactRetentionTTL

return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.GitRepository{}, builder.WithPredicates(
Expand Down Expand Up @@ -281,8 +275,8 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context,
obj *sourcev1.GitRepository, _ *git.Commit, _ *artifactSet, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 1/3 the requeue interval to avoid stalling the worker.
ctx, cancel := context.WithTimeout(ctx, time.Duration(obj.Spec.Interval.Duration.Seconds()/3))
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)

Expand Down
12 changes: 4 additions & 8 deletions controllers/gitrepository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,10 +1178,8 @@ func TestGitRepositoryReconciler_garbageCollect(t *testing.T) {
}()

r := &GitRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
artifactRetentionRecords: 2,
artifactRetentionTTL: 2 * time.Second,
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}

obj := &sourcev1.GitRepository{
Expand Down Expand Up @@ -1287,10 +1285,8 @@ func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) {
}()

r := &GitRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
artifactRetentionRecords: 2,
artifactRetentionTTL: 2 * time.Second,
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}

obj := &sourcev1.GitRepository{
Expand Down
33 changes: 26 additions & 7 deletions controllers/helmchart_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC
// they match the Storage server hostname of current runtime.
func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmChart, build *chart.Build) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)

// Determine if the advertised artifact is still in storage
Expand Down Expand Up @@ -801,14 +804,30 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1.
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
delFilesChan := make(chan []string)
errChan := make(chan error)
go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan)
for {
select {
case <-ctx.Done():
err := context.DeadlineExceeded
r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
fmt.Sprintf("garbage collection of old artifacts failed: %s", err))
return err
case delFiles := <-delFilesChan:
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d old artifacts", len(delFiles)))
}
return nil
case err := <-errChan:
e := &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
}
}
return nil
Expand Down
Loading

0 comments on commit d8b1029

Please sign in to comment.