Skip to content

Commit

Permalink
Add unit test cases and minor updates to Create (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolancon authored Dec 5, 2023
1 parent 0ba5a41 commit da39a7d
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 65 deletions.
71 changes: 36 additions & 35 deletions internal/controller/bucket/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
defer cancel()

if bucket.Spec.Disabled {
c.log.Info("Bucket is disabled - no buckets to be created on backends", "bucket name", bucket.Name)
c.log.Info("Bucket is disabled - no buckets to be created on backends", "bucket_name", bucket.Name)

return managed.ExternalCreation{}, nil
}
Expand All @@ -50,8 +50,10 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
return managed.ExternalCreation{}, errors.New(errMissingS3Backend)
}

updated := atomic.Bool{}
errorsLeft := 0
// This value shows a bucket on one backend is already created.
// It is used to prevent goroutines from sending duplicated messages to `readyChan`.
bucketAlreadyCreated := atomic.Bool{}
backendCount := 0
errChan := make(chan error, len(activeBackends))
readyChan := make(chan string)

Expand All @@ -60,21 +62,22 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext

cl := c.backendStore.GetBackendClient(beName)
if cl == nil {
c.log.Info("Backend client not found for backend - bucket cannot be created on backend", "bucket name", originalBucket.Name, "backend name", beName)
c.log.Info("Backend client not found for backend - bucket cannot be created on backend", "bucket_name", originalBucket.Name, "backend_name", beName)

continue
}

c.log.Info("Creating bucket", "bucket name", originalBucket.Name, "backend name", beName)
c.log.Info("Creating bucket on backend", "bucket_name", originalBucket.Name, "backend_name", beName)

pc := &apisv1alpha1.ProviderConfig{}
if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: beName}, pc); err != nil {
c.log.Info("Failed to fetch provider config", "backend name", beName, "bucket_name", originalBucket.Name, "err", err.Error())
c.log.Info("Failed to fetch provider config", "bucket_name", originalBucket.Name, "backend_name", beName, "err", err.Error())

return managed.ExternalCreation{}, errors.Wrap(err, errGetPC)
}

errorsLeft++
// Increment the backend counter. We need this later to know when the operation should finish.
backendCount++

beName := beName
go func() {
Expand All @@ -83,53 +86,59 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
for i := 0; i < s3internal.RequestRetries; i++ {
_, err = s3internal.CreateBucket(ctx, cl, s3internal.BucketToCreateBucketInput(originalBucket))
if resource.Ignore(s3internal.IsAlreadyExists, err) == nil {
c.log.Info("Bucket created on backend", "bucket_name", bucket.Name, "backend_name", beName)

break
}
}
if err != nil {
c.log.Info("Failed to create bucket on backend", "bucket_name", originalBucket.Name, "backend_name", beName, "err", err.Error())

if !updated.CompareAndSwap(false, true) {
c.log.Info("Bucket already updated", "bucket_name", originalBucket.Name)

errChan <- nil
errChan <- err

return
}

if err != nil {
c.log.Info("Failed to create bucket on backend", "backend name", beName, "bucket_name", originalBucket.Name, "err", err.Error())
// This compare-and-swap operation is the atomic equivalent of:
// if *bucketAlreadyCreated == false {
// *bucketAlreadyCreated = true
// return true
// }
// return false
if !bucketAlreadyCreated.CompareAndSwap(false, true) {
c.log.Info("Bucket already created on backend - terminate thread without error", "bucket_name", originalBucket.Name, "backend_name", beName)

errChan <- err
errChan <- nil

return
}

// Once a bucket is created successfully on ANY backend, the bucket is considered ready.
// Therefore we send the name of the backend on which the bucket is first created to the ready channel.
readyChan <- beName
errChan <- nil
}()
}

if errorsLeft == 0 {
if backendCount == 0 {
c.log.Info("Failed to find any backend for bucket", "bucket_name", bucket.Name)

return managed.ExternalCreation{}, nil
}

return c.waitForCreationAndUpdateObject(ctx, bucket, readyChan, errChan, errorsLeft)
return c.waitForCreationAndUpdateBucketCR(ctx, bucket, readyChan, errChan, backendCount)
}

func (c *external) waitForCreationAndUpdateObject(ctx context.Context, bucket *v1alpha1.Bucket, readyChan <-chan string, errChan <-chan error, errorsLeft int) (managed.ExternalCreation, error) {
func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket *v1alpha1.Bucket, readyChan <-chan string, errChan <-chan error, backendCount int) (managed.ExternalCreation, error) {
var err error

WAIT:
for {
for i := 0; i < backendCount; i++ {
select {
case <-ctx.Done():
c.log.Info("Context timeout", "bucket_name", bucket.Name)
c.log.Info("Context timeout waiting for bucket creation", "bucket_name", bucket.Name)

return managed.ExternalCreation{}, ctx.Err()
case beName := <-readyChan:
c.log.Info("Bucket created", "backend name", beName, "bucket_name", bucket.Name)

err := c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
// Remove the annotation, because Crossplane is not always able to do it.
// This workaround doesn't eliminates the problem, if this update fails,
Expand All @@ -154,25 +163,17 @@ WAIT:
return NeedsStatusUpdate
})
if err != nil {
c.log.Info("Failed to update Bucket CR", "backend name", beName, "bucket_name", bucket.Name)
c.log.Info("Failed to update Bucket CR with backend info", "bucket_name", bucket.Name, "backend_name", beName, "err", err.Error())
}

return managed.ExternalCreation{}, err
case err = <-errChan:
errorsLeft--

if err != nil {
c.log.Info("Failed to create on backend", "bucket_name", bucket.Name, "err", err.Error())

if errorsLeft > 0 {
continue
}

break WAIT
}
case <-errChan:
continue
}
}

c.log.Info("Failed to create bucket on any backend", "bucket_name", bucket.Name)

err = c.updateBucketCR(ctx, bucket, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
bucketLatest.Status.SetConditions(xpv1.Unavailable())

Expand Down
Loading

0 comments on commit da39a7d

Please sign in to comment.