diff --git a/internal/controller/bucket/create.go b/internal/controller/bucket/create.go index 826aa9ec..933cab07 100644 --- a/internal/controller/bucket/create.go +++ b/internal/controller/bucket/create.go @@ -29,7 +29,7 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext defer cancel() if bucket.Spec.Disabled { - c.log.Info("Bucket is disabled - no buckets to be created on backends", "bucket name", bucket.Name) + c.log.Info("Bucket is disabled - no buckets to be created on backends", "bucket_name", bucket.Name) return managed.ExternalCreation{}, nil } @@ -50,8 +50,10 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalCreation{}, errors.New(errMissingS3Backend) } - updated := atomic.Bool{} - errorsLeft := 0 + // This value shows a bucket on one backend is already created. + // It is used to prevent goroutines from sending duplicated messages to `readyChan`. + bucketAlreadyCreated := atomic.Bool{} + backendCount := 0 errChan := make(chan error, len(activeBackends)) readyChan := make(chan string) @@ -60,21 +62,22 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext cl := c.backendStore.GetBackendClient(beName) if cl == nil { - c.log.Info("Backend client not found for backend - bucket cannot be created on backend", "bucket name", originalBucket.Name, "backend name", beName) + c.log.Info("Backend client not found for backend - bucket cannot be created on backend", "bucket_name", originalBucket.Name, "backend_name", beName) continue } - c.log.Info("Creating bucket", "bucket name", originalBucket.Name, "backend name", beName) + c.log.Info("Creating bucket on backend", "bucket_name", originalBucket.Name, "backend_name", beName) pc := &apisv1alpha1.ProviderConfig{} if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: beName}, pc); err != nil { - c.log.Info("Failed to fetch provider config", "backend name", beName, "bucket_name", originalBucket.Name, "err", err.Error()) + c.log.Info("Failed to fetch provider config", "bucket_name", originalBucket.Name, "backend_name", beName, "err", err.Error()) return managed.ExternalCreation{}, errors.Wrap(err, errGetPC) } - errorsLeft++ + // Increment the backend counter. We need this later to know when the operation should finish. + backendCount++ beName := beName go func() { @@ -83,53 +86,59 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext for i := 0; i < s3internal.RequestRetries; i++ { _, err = s3internal.CreateBucket(ctx, cl, s3internal.BucketToCreateBucketInput(originalBucket)) if resource.Ignore(s3internal.IsAlreadyExists, err) == nil { + c.log.Info("Bucket created on backend", "bucket_name", bucket.Name, "backend_name", beName) + break } } + if err != nil { + c.log.Info("Failed to create bucket on backend", "bucket_name", originalBucket.Name, "backend_name", beName, "err", err.Error()) - if !updated.CompareAndSwap(false, true) { - c.log.Info("Bucket already updated", "bucket_name", originalBucket.Name) - - errChan <- nil + errChan <- err return } - if err != nil { - c.log.Info("Failed to create bucket on backend", "backend name", beName, "bucket_name", originalBucket.Name, "err", err.Error()) + // This compare-and-swap operation is the atomic equivalent of: + // if *bucketAlreadyCreated == false { + // *bucketAlreadyCreated = true + // return true + // } + // return false + if !bucketAlreadyCreated.CompareAndSwap(false, true) { + c.log.Info("Bucket already created on backend - terminate thread without error", "bucket_name", originalBucket.Name, "backend_name", beName) - errChan <- err + errChan <- nil return } + // Once a bucket is created successfully on ANY backend, the bucket is considered ready. + // Therefore we send the name of the backend on which the bucket is first created to the ready channel. readyChan <- beName errChan <- nil }() } - if errorsLeft == 0 { + if backendCount == 0 { c.log.Info("Failed to find any backend for bucket", "bucket_name", bucket.Name) return managed.ExternalCreation{}, nil } - return c.waitForCreationAndUpdateObject(ctx, bucket, readyChan, errChan, errorsLeft) + return c.waitForCreationAndUpdateBucketCR(ctx, bucket, readyChan, errChan, backendCount) } -func (c *external) waitForCreationAndUpdateObject(ctx context.Context, bucket *v1alpha1.Bucket, readyChan <-chan string, errChan <-chan error, errorsLeft int) (managed.ExternalCreation, error) { +func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket *v1alpha1.Bucket, readyChan <-chan string, errChan <-chan error, backendCount int) (managed.ExternalCreation, error) { var err error -WAIT: - for { + for i := 0; i < backendCount; i++ { select { case <-ctx.Done(): - c.log.Info("Context timeout", "bucket_name", bucket.Name) + c.log.Info("Context timeout waiting for bucket creation", "bucket_name", bucket.Name) return managed.ExternalCreation{}, ctx.Err() case beName := <-readyChan: - c.log.Info("Bucket created", "backend name", beName, "bucket_name", bucket.Name) - err := c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired { // Remove the annotation, because Crossplane is not always able to do it. // This workaround doesn't eliminates the problem, if this update fails, @@ -154,25 +163,17 @@ WAIT: return NeedsStatusUpdate }) if err != nil { - c.log.Info("Failed to update Bucket CR", "backend name", beName, "bucket_name", bucket.Name) + c.log.Info("Failed to update Bucket CR with backend info", "bucket_name", bucket.Name, "backend_name", beName, "err", err.Error()) } return managed.ExternalCreation{}, err - case err = <-errChan: - errorsLeft-- - - if err != nil { - c.log.Info("Failed to create on backend", "bucket_name", bucket.Name, "err", err.Error()) - - if errorsLeft > 0 { - continue - } - - break WAIT - } + case <-errChan: + continue } } + c.log.Info("Failed to create bucket on any backend", "bucket_name", bucket.Name) + err = c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired { bucketLatest.Status.SetConditions(xpv1.Unavailable()) diff --git a/internal/controller/bucket/create_test.go b/internal/controller/bucket/create_test.go index ff569fe4..e59fb322 100644 --- a/internal/controller/bucket/create_test.go +++ b/internal/controller/bucket/create_test.go @@ -3,7 +3,10 @@ package bucket import ( "context" "testing" + "time" + "github.com/aws/aws-sdk-go-v2/service/s3" + v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -12,16 +15,18 @@ import ( "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" apisv1alpha1 "github.com/linode/provider-ceph/apis/v1alpha1" "github.com/linode/provider-ceph/internal/backendstore" + "github.com/linode/provider-ceph/internal/backendstore/backendstorefakes" "github.com/pkg/errors" - "k8s.io/client-go/kubernetes/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +//nolint:maintidx,paralleltest // Function requires numerous checks. Running in parallel causes issues with client. func TestCreate(t *testing.T) { - t.Parallel() - type fields struct { - backendStore *backendstore.BackendStore + backendStore *backendstore.BackendStore + providerConfigs *apisv1alpha1.ProviderConfigList } type args struct { @@ -29,8 +34,9 @@ func TestCreate(t *testing.T) { } type want struct { - o managed.ExternalCreation - err error + o managed.ExternalCreation + statusDiff func(mg resource.Managed) string + err error } cases := map[string]struct { @@ -39,17 +45,6 @@ func TestCreate(t *testing.T) { args args want want }{ - "Invalid managed resource": { - fields: fields{ - backendStore: backendstore.NewBackendStore(), - }, - args: args{ - mg: unexpectedItem, - }, - want: want{ - err: errors.New(errNotBucket), - }, - }, "S3 backends missing": { fields: fields{ backendStore: backendstore.NewBackendStore(), @@ -133,23 +128,241 @@ func TestCreate(t *testing.T) { err: errors.New(errNoS3BackendsStored), }, }, - } + "Create succeeds on single backend": { + fields: fields{ + providerConfigs: &apisv1alpha1.ProviderConfigList{ + Items: []apisv1alpha1.ProviderConfig{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-1", + }, + }, + }, + }, + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{} + fake.CreateBucketReturns( + &s3.CreateBucketOutput{}, + nil, + ) - pc := &apisv1alpha1.ProviderConfig{} - s := scheme.Scheme - s.AddKnownTypes(apisv1alpha1.SchemeGroupVersion, pc) + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + mg: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + }, + }, + want: want{ + err: nil, + statusDiff: func(mg resource.Managed) string { + bucket, _ := mg.(*v1alpha1.Bucket) + + return cmp.Diff( + v1alpha1.BucketStatus{ + ResourceStatus: v1.ResourceStatus{ + ConditionedStatus: v1.ConditionedStatus{ + Conditions: []v1.Condition{ + { + Type: "Ready", + Status: "True", + Reason: "Available", + }, + }, + }, + }, + AtProvider: v1alpha1.BucketObservation{ + Backends: v1alpha1.Backends{ + "s3-backend-1": &v1alpha1.BackendInfo{ + BucketStatus: v1alpha1.ReadyStatus, + }, + }, + }, + }, + bucket.Status, + ) + }, + }, + }, + "Create fails on two backends and succeeds on one": { + fields: fields{ + providerConfigs: &apisv1alpha1.ProviderConfigList{ + Items: []apisv1alpha1.ProviderConfig{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-2", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-3", + }, + }, + }, + }, + backendStore: func() *backendstore.BackendStore { + fakeClientError := backendstorefakes.FakeS3Client{} + fakeClientOK := backendstorefakes.FakeS3Client{} + + fakeClientError.CreateBucketReturns( + &s3.CreateBucketOutput{}, + errors.New("some error"), + ) + + fakeClientOK.CreateBucketReturns( + &s3.CreateBucketOutput{}, + nil, + ) + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fakeClientError, true, apisv1alpha1.HealthStatusHealthy) + bs.AddOrUpdateBackend("s3-backend-2", &fakeClientError, true, apisv1alpha1.HealthStatusHealthy) + bs.AddOrUpdateBackend("s3-backend-3", &fakeClientOK, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + mg: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + }, + }, + want: want{ + err: nil, + statusDiff: func(mg resource.Managed) string { + bucket, _ := mg.(*v1alpha1.Bucket) + + return cmp.Diff( + v1alpha1.BucketStatus{ + ResourceStatus: v1.ResourceStatus{ + ConditionedStatus: v1.ConditionedStatus{ + Conditions: []v1.Condition{ + { + Type: "Ready", + Status: "True", + Reason: "Available", + }, + }, + }, + }, + AtProvider: v1alpha1.BucketObservation{ + Backends: v1alpha1.Backends{ + "s3-backend-3": &v1alpha1.BackendInfo{ + BucketStatus: v1alpha1.ReadyStatus, + }, + }, + }, + }, + bucket.Status, + ) + }, + }, + }, + "Create fails on all backends": { + fields: fields{ + providerConfigs: &apisv1alpha1.ProviderConfigList{ + Items: []apisv1alpha1.ProviderConfig{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-2", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s3-backend-3", + }, + }, + }, + }, + backendStore: func() *backendstore.BackendStore { + fakeClientError := backendstorefakes.FakeS3Client{} + + fakeClientError.CreateBucketReturns( + &s3.CreateBucketOutput{}, + errors.New("some error"), + ) + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fakeClientError, true, apisv1alpha1.HealthStatusHealthy) + bs.AddOrUpdateBackend("s3-backend-2", &fakeClientError, true, apisv1alpha1.HealthStatusHealthy) + bs.AddOrUpdateBackend("s3-backend-3", &fakeClientError, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + mg: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + }, + }, + want: want{ + err: nil, + statusDiff: func(mg resource.Managed) string { + bucket, _ := mg.(*v1alpha1.Bucket) + + return cmp.Diff( + v1alpha1.BucketStatus{ + ResourceStatus: v1.ResourceStatus{ + ConditionedStatus: v1.ConditionedStatus{ + Conditions: []v1.Condition{ + { + Type: "Ready", + Status: "False", + Reason: "Unavailable", + }, + }, + }, + }, + }, + bucket.Status, + ) + }, + }, + }, + } for name, tc := range cases { tc := tc t.Run(name, func(t *testing.T) { - t.Parallel() + s := runtime.NewScheme() + s.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.Bucket{}, &v1alpha1.BucketList{}) + s.AddKnownTypes(apisv1alpha1.SchemeGroupVersion, &apisv1alpha1.ProviderConfig{}, &apisv1alpha1.ProviderConfigList{}) + + cl := fake.NewClientBuilder(). + WithScheme(s). + WithObjects(tc.args.mg). + WithStatusSubresource(tc.args.mg) + + if tc.fields.providerConfigs != nil { + cl.WithLists(tc.fields.providerConfigs) + } - cl := fake.NewClientBuilder().WithScheme(s).Build() e := external{ - kubeClient: cl, - backendStore: tc.fields.backendStore, - log: logging.NewNopLogger(), + kubeClient: cl.Build(), + backendStore: tc.fields.backendStore, + log: logging.NewNopLogger(), + operationTimeout: time.Second * 5, } got, err := e.Create(context.Background(), tc.args.mg) @@ -160,6 +373,11 @@ func TestCreate(t *testing.T) { if diff := cmp.Diff(tc.want.o, got); diff != "" { t.Errorf("\n%s\ne.Create(...): -want, +got:\n%s\n", tc.reason, diff) } + if tc.want.statusDiff != nil { + if diff := tc.want.statusDiff(tc.args.mg); diff != "" { + t.Errorf("\n%s\ne.Create(...): -want, +got:\n%s\n", tc.reason, diff) + } + } }) } } diff --git a/internal/s3/bucket.go b/internal/s3/bucket.go index d32296f0..b25b313d 100644 --- a/internal/s3/bucket.go +++ b/internal/s3/bucket.go @@ -217,10 +217,6 @@ func BucketExists(ctx context.Context, s3Backend backendstore.S3Client, bucketNa ctx, span := otel.Tracer("").Start(ctx, "BucketExists") defer span.End() - if cache.Exists(bucketName) { - return true, nil - } - _, err := s3Backend.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucketName)}) if err != nil { return false, resource.IgnoreAny(err, NoSuchBucket, IsNotFound)