Skip to content

Commit

Permalink
make reconciler garbageCollect simpler
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed Apr 6, 2022
1 parent a1e8a5c commit 391d559
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 511 deletions.
39 changes: 13 additions & 26 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
// they match the Storage server hostname of current runtime.
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)

// Determine if the advertised artifact is still in storage
Expand Down Expand Up @@ -635,30 +632,20 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
return nil
}
if obj.GetArtifact() != nil {
delFilesChan := make(chan []string)
errChan := make(chan error)
go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan)
for {
select {
case <-ctx.Done():
err := context.DeadlineExceeded
r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
fmt.Sprintf("garbage collection of old artifacts failed: %s", err))
return err
case delFiles := <-delFilesChan:
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d old artifacts", len(delFiles)))
}
return nil
case err := <-errChan:
e := &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
fmt.Sprintf("garbage collection of old artifacts failed: %s", e))
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d old artifacts", len(delFiles)))
return nil
}
}

Expand Down
118 changes: 21 additions & 97 deletions controllers/bucket_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ func TestBucketReconciler_Reconcile(t *testing.T) {
}, timeout).Should(BeTrue())
}

func TestBucketReconciler_garbageCollect(t *testing.T) {
func TestBucketReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
wantErr string
assertPaths []string
ctxTimeout time.Duration
name string
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "garbage collects",
Expand All @@ -178,7 +180,7 @@ func TestBucketReconciler_garbageCollect(t *testing.T) {
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/garbage-collect/%s.txt", v),
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
Expand All @@ -194,99 +196,21 @@ func TestBucketReconciler_garbageCollect(t *testing.T) {
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
wantErr: "",
assertPaths: []string{
"/garbage-collect/d.txt",
"/garbage-collect/c.txt",
"!/garbage-collect/b.txt",
"!/garbage-collect/a.txt",
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
},
{
name: "garbage collection fails with context timeout",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/garbage-collect/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
return err
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
ctxTimeout: time.Second * 1,
wantErr: "context deadline exceeded",
assertPaths: []string{},
want: sreconcile.ResultSuccess,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed())
}()

r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}

obj := &sourcev1.Bucket{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
}
if tt.beforeFunc != nil {
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
ctx := context.TODO()
var cancel context.CancelFunc
if tt.ctxTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout)
defer cancel()
time.Sleep(tt.ctxTimeout * 2)
}
err := r.garbageCollect(ctx, obj)
if tt.wantErr != "" {
g.Expect(err).ToNot(BeNil())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
} else {
g.Expect(err).To(BeNil())
}

for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
})
}

}

func TestBucketReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "notices missing artifact in storage",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
Expand Down
39 changes: 13 additions & 26 deletions controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,6 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context,
obj *sourcev1.GitRepository, _ *git.Commit, _ *artifactSet, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)

// Determine if the advertised artifact is still in storage
Expand Down Expand Up @@ -711,30 +708,20 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
return nil
}
if obj.GetArtifact() != nil {
delFilesChan := make(chan []string)
errChan := make(chan error)
go r.Storage.GarbageCollect(*obj.GetArtifact(), delFilesChan, errChan)
for {
select {
case <-ctx.Done():
err := context.DeadlineExceeded
r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
fmt.Sprintf("garbage collection of old artifacts failed: %s", err))
return err
case delFiles := <-delFilesChan:
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d old artifacts", len(delFiles)))
}
return nil
case err := <-errChan:
e := &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
r.eventLogf(ctx, obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
fmt.Sprintf("garbage collection of old artifacts failed: %s", e))
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
fmt.Sprintf("garbage collected %d old artifacts", len(delFiles)))
return nil
}
}
return nil
Expand Down
118 changes: 21 additions & 97 deletions controllers/gitrepository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,13 +1104,15 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) {
}
}

func TestGitRepositoryReconciler_garbageCollect(t *testing.T) {
func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error
wantErr string
assertPaths []string
ctxTimeout time.Duration
name string
beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "garbage collects",
Expand All @@ -1119,7 +1121,7 @@ func TestGitRepositoryReconciler_garbageCollect(t *testing.T) {
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/garbage-collect/%s.txt", v),
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
Expand All @@ -1135,99 +1137,21 @@ func TestGitRepositoryReconciler_garbageCollect(t *testing.T) {
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
wantErr: "",
assertPaths: []string{
"/garbage-collect/d.txt",
"/garbage-collect/c.txt",
"!/garbage-collect/b.txt",
"!/garbage-collect/a.txt",
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
},
{
name: "garbage collection fails with context timeout",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/garbage-collect/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
return err
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
ctxTimeout: time.Second * 1,
wantErr: "context deadline exceeded",
assertPaths: []string{},
want: sreconcile.ResultSuccess,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/garbage-collect"))).To(Succeed())
}()

r := &GitRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}

obj := &sourcev1.GitRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
}
if tt.beforeFunc != nil {
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
ctx := context.TODO()
var cancel context.CancelFunc
if tt.ctxTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, tt.ctxTimeout)
defer cancel()
time.Sleep(tt.ctxTimeout * 2)
}
err := r.garbageCollect(ctx, obj)
if tt.wantErr != "" {
g.Expect(err).ToNot(BeNil())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
} else {
g.Expect(err).To(BeNil())
}

for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
})
}

}

func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "notices missing artifact in storage",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
Expand Down
Loading

0 comments on commit 391d559

Please sign in to comment.