Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added lower level S3 calls and tidy ups #112

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package healthcheck

import (
"context"
"fmt"
"strings"
"time"

Expand All @@ -27,6 +26,7 @@ import (
v1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/errors"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -50,7 +50,6 @@ const (
errPutHealthCheckFile = "failed to upload health check file"
errGetHealthCheckFile = "failed to get health check file"
errCreateHealthCheckBucket = "failed to create health check bucket"
errDoHealthCheck = "failed to perform health check"
errHealthCheckCleanup = "failed to perform health check cleanup"
errDeleteHealthCheckBucket = "failed to delete health check bucket"
errDeleteLCValidationBucket = "failed to delete lifecycle configuration validation bucket"
Expand All @@ -60,6 +59,7 @@ const (
healthCheckFile = "health-check-file"
)

//nolint:gocyclo,cyclop // Function requires multiple checks.
func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx, span := otel.Tracer("").Start(ctx, "healthcheck.Controller.Reconcile")
defer span.End()
Expand Down Expand Up @@ -120,15 +120,31 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
}()

s3BackendClient := c.backendStore.GetBackendClient(providerConfig.Name)
if s3BackendClient == nil {
err := errors.New(errBackendNotStored)
traces.SetAndRecordError(span, err)

return ctrl.Result{}, err
}

// Check the backend for the existence of the health check bucket.
bucketExists, err := s3internal.BucketExists(ctx, s3BackendClient, bucketName)
if err != nil {
providerConfig.Status.SetConditions(v1alpha1.HealthCheckFail().WithMessage(err.Error()))
traces.SetAndRecordError(span, err)

return ctrl.Result{}, err
}

// Create a health check bucket on the backend if one does not already exist.
if err := c.bucketExists(ctx, req.Name, bucketName); err != nil {
if err := c.createBucket(ctx, req.Name, bucketName); err != nil {
if !bucketExists {
_, err := s3internal.CreateBucket(ctx, s3BackendClient, &s3.CreateBucketInput{Bucket: aws.String(bucketName)})
if resource.Ignore(s3internal.IsAlreadyExists, err) != nil {
c.log.Info("Failed to create bucket for health check on s3 backend", consts.KeyBucketName, bucketName, consts.KeyBackendName, providerConfig.Name)

msg := fmt.Sprintf("failed to create health check bucket: %v", err.Error())
providerConfig.Status.SetConditions(v1alpha1.HealthCheckFail().WithMessage(msg))

err = errors.Wrap(err, errCreateHealthCheckBucket)
providerConfig.Status.SetConditions(v1alpha1.HealthCheckFail().WithMessage(err.Error()))
traces.SetAndRecordError(span, err)

return ctrl.Result{}, err
Expand All @@ -140,10 +156,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if err := c.doHealthCheck(ctx, providerConfig, bucketName); err != nil {
c.log.Info("Failed to do health check on s3 backend", consts.KeyBucketName, bucketName, consts.KeyBackendName, providerConfig.Name)

msg := errDoHealthCheck + ": " + err.Error()
providerConfig.Status.SetConditions(v1alpha1.HealthCheckFail().WithMessage(msg))

err = errors.Wrap(err, errDoHealthCheck)
providerConfig.Status.SetConditions(v1alpha1.HealthCheckFail().WithMessage(err.Error()))
traces.SetAndRecordError(span, err)

return ctrl.Result{}, err
Expand Down Expand Up @@ -177,19 +190,26 @@ func (c *Controller) cleanup(ctx context.Context, req ctrl.Request, bucketName s
return nil
}

c.log.Info("Deleting health check bucket", consts.KeyBucketName, bucketName, consts.KeyBackendName, req.Name)
g := new(errgroup.Group)
g.Go(func() error {
c.log.Info("Deleting health check bucket", consts.KeyBucketName, bucketName, consts.KeyBackendName, req.Name)
if err := s3internal.DeleteBucket(ctx, backendClient, aws.String(bucketName)); err != nil {
return errors.Wrap(err, errDeleteHealthCheckBucket)
}

if err := s3internal.DeleteBucket(ctx, backendClient, aws.String(bucketName)); err != nil {
return errors.Wrap(err, errDeleteHealthCheckBucket)
}
return nil
})

c.log.Info("Deleting lifecycle configuration validation bucket", consts.KeyBucketName, v1alpha1.LifecycleConfigValidationBucketName, consts.KeyBackendName, req.Name)
g.Go(func() error {
c.log.Info("Deleting lifecycle configuration validation bucket", consts.KeyBucketName, v1alpha1.LifecycleConfigValidationBucketName, consts.KeyBackendName, req.Name)
if err := s3internal.DeleteBucket(ctx, backendClient, aws.String(v1alpha1.LifecycleConfigValidationBucketName)); err != nil {
return errors.Wrap(err, errDeleteLCValidationBucket)
}

if err := s3internal.DeleteBucket(ctx, backendClient, aws.String(v1alpha1.LifecycleConfigValidationBucketName)); err != nil {
return errors.Wrap(err, errDeleteLCValidationBucket)
}
return nil
})

return nil
return g.Wait()
}

// doHealthCheck performs a PutObject and GetObject on the health check bucket on the backend.
Expand All @@ -202,21 +222,26 @@ func (c *Controller) doHealthCheck(ctx context.Context, providerConfig *apisv1al
return errors.New(errBackendNotStored)
}

_, putErr := s3BackendClient.PutObject(ctx, &s3.PutObjectInput{
if putErr := s3internal.PutObject(ctx, s3BackendClient, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(healthCheckFile),
Body: strings.NewReader(time.Now().Format(time.RFC850)),
})
if putErr != nil {
return errors.Wrap(putErr, errPutHealthCheckFile)
}); putErr != nil {
putErr = errors.Wrap(putErr, errPutHealthCheckFile)
traces.SetAndRecordError(span, putErr)

return putErr
}

_, getErr := s3BackendClient.GetObject(ctx, &s3.GetObjectInput{
_, getErr := s3internal.GetObject(ctx, s3BackendClient, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(healthCheckFile),
})
if getErr != nil {
return errors.Wrap(getErr, errGetHealthCheckFile)
getErr = errors.Wrap(getErr, errGetHealthCheckFile)
traces.SetAndRecordError(span, getErr)

return getErr
}

// Health check completed successfully, update status.
Expand All @@ -225,30 +250,6 @@ func (c *Controller) doHealthCheck(ctx context.Context, providerConfig *apisv1al
return nil
}

func (c *Controller) bucketExists(ctx context.Context, s3BackendName, bucketName string) error {
s3BackendClient := c.backendStore.GetBackendClient(s3BackendName)
if s3BackendClient == nil {
return errors.New(errBackendNotStored)
}

_, err := s3BackendClient.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(bucketName)})

return err
}

func (c *Controller) createBucket(ctx context.Context, s3BackendName, bucketName string) error {
s3BackendClient := c.backendStore.GetBackendClient(s3BackendName)
if s3BackendClient == nil {
return errors.New(errBackendNotStored)
}

_, err := s3BackendClient.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})

return resource.Ignore(s3internal.IsAlreadyExists, err)
}

// unpauseBuckets lists all buckets that exist on the given backend by using the custom
// backend label. Then, using retry.OnError(), it attempts to unpause each of these buckets
// by unsetting the Pause label.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ import (
func TestReconcile(t *testing.T) {
t.Parallel()
backendName := "test-backend"
lowerPutObjErr := errors.New("some put err")
putObjErr := errors.New("failed to put object")
lowerGetObjErr := errors.New("some get err")
getObjErr := errors.New("failed to get object")
lowerHeadBucketErr := errors.New("some head bucket err")
headBucketErr := errors.New("failed to perform head bucket")
lowerCreateBucketErr := errors.New("some create bucket err")
createBucketErr := errors.New("failed to create bucket")

type fields struct {
fakeS3Client func(*backendstorefakes.FakeS3Client)
Expand Down Expand Up @@ -139,13 +145,134 @@ func TestReconcile(t *testing.T) {
},
},
},
"ProviderConfig goes from healthy to unhealthy due to failed head bucket": {
fields: fields{
fakeS3Client: func(fake *backendstorefakes.FakeS3Client) {
// fail the health check with a HeadBucket error
fake.HeadBucketReturns(
&s3.HeadBucketOutput{},
lowerHeadBucketErr,
)
},
providerConfig: &apisv1alpha1.ProviderConfig{
ObjectMeta: metav1.ObjectMeta{
Name: backendName,
},
Spec: apisv1alpha1.ProviderConfigSpec{
DisableHealthCheck: false,
},
Status: apisv1alpha1.ProviderConfigStatus{
ProviderConfigStatus: xpv1.ProviderConfigStatus{
ConditionedStatus: xpv1.ConditionedStatus{
Conditions: []xpv1.Condition{
v1alpha1.HealthCheckSuccess(),
},
},
},
},
},
},
args: args{
req: ctrl.Request{
NamespacedName: types.NamespacedName{
Name: backendName,
},
},
},
want: want{
res: ctrl.Result{},
err: lowerHeadBucketErr,
pc: &apisv1alpha1.ProviderConfig{
ObjectMeta: metav1.ObjectMeta{
Name: backendName,
},
Spec: apisv1alpha1.ProviderConfigSpec{
DisableHealthCheck: false,
},
Status: apisv1alpha1.ProviderConfigStatus{
ProviderConfigStatus: xpv1.ProviderConfigStatus{
ConditionedStatus: xpv1.ConditionedStatus{
Conditions: []xpv1.Condition{
v1alpha1.HealthCheckFail().
WithMessage(errors.Wrap(lowerHeadBucketErr, headBucketErr.Error()).Error()),
},
},
},
},
},
},
},
"ProviderConfig goes from healthy to unhealthy due to failed create bucket": {
fields: fields{
fakeS3Client: func(fake *backendstorefakes.FakeS3Client) {
// HeadBucket returns not found so the bucket
// does not exist and must be created.
var notFoundError *s3types.NotFound
fake.HeadBucketReturns(
&s3.HeadBucketOutput{},
notFoundError,
)
// fail the health check with a CreateBucket error
fake.CreateBucketReturns(
&s3.CreateBucketOutput{},
lowerCreateBucketErr,
)
},
providerConfig: &apisv1alpha1.ProviderConfig{
ObjectMeta: metav1.ObjectMeta{
Name: backendName,
},
Spec: apisv1alpha1.ProviderConfigSpec{
DisableHealthCheck: false,
},
Status: apisv1alpha1.ProviderConfigStatus{
ProviderConfigStatus: xpv1.ProviderConfigStatus{
ConditionedStatus: xpv1.ConditionedStatus{
Conditions: []xpv1.Condition{
v1alpha1.HealthCheckSuccess(),
},
},
},
},
},
},
args: args{
req: ctrl.Request{
NamespacedName: types.NamespacedName{
Name: backendName,
},
},
},
want: want{
res: ctrl.Result{},
err: lowerCreateBucketErr,
pc: &apisv1alpha1.ProviderConfig{
ObjectMeta: metav1.ObjectMeta{
Name: backendName,
},
Spec: apisv1alpha1.ProviderConfigSpec{
DisableHealthCheck: false,
},
Status: apisv1alpha1.ProviderConfigStatus{
ProviderConfigStatus: xpv1.ProviderConfigStatus{
ConditionedStatus: xpv1.ConditionedStatus{
Conditions: []xpv1.Condition{
v1alpha1.HealthCheckFail().
WithMessage(errors.Wrap(errors.Wrap(lowerCreateBucketErr, createBucketErr.Error()), errCreateHealthCheckBucket).Error()),
},
},
},
},
},
},
},
"ProviderConfig goes from healthy to unhealthy due to failed put object": {
fields: fields{
fakeS3Client: func(fake *backendstorefakes.FakeS3Client) {
// fail the health check with a PutObject error
fake.PutObjectReturns(
&s3.PutObjectOutput{},
putObjErr,
lowerPutObjErr,
)
},
providerConfig: &apisv1alpha1.ProviderConfig{
Expand Down Expand Up @@ -175,7 +302,7 @@ func TestReconcile(t *testing.T) {
},
want: want{
res: ctrl.Result{},
err: putObjErr,
err: lowerPutObjErr,
pc: &apisv1alpha1.ProviderConfig{
ObjectMeta: metav1.ObjectMeta{
Name: backendName,
Expand All @@ -188,7 +315,7 @@ func TestReconcile(t *testing.T) {
ConditionedStatus: xpv1.ConditionedStatus{
Conditions: []xpv1.Condition{
v1alpha1.HealthCheckFail().
WithMessage(errDoHealthCheck + ": " + errors.Wrap(putObjErr, errPutHealthCheckFile).Error()),
WithMessage(errors.Wrap(errors.Wrap(lowerPutObjErr, putObjErr.Error()), errPutHealthCheckFile).Error()),
},
},
},
Expand All @@ -202,7 +329,7 @@ func TestReconcile(t *testing.T) {
// fail the health check with a GetObject error
fake.GetObjectReturns(
&s3.GetObjectOutput{},
getObjErr,
lowerGetObjErr,
)
},
providerConfig: &apisv1alpha1.ProviderConfig{
Expand Down Expand Up @@ -232,7 +359,7 @@ func TestReconcile(t *testing.T) {
},
want: want{
res: ctrl.Result{},
err: getObjErr,
err: lowerGetObjErr,
pc: &apisv1alpha1.ProviderConfig{
ObjectMeta: metav1.ObjectMeta{
Name: backendName,
Expand All @@ -245,7 +372,7 @@ func TestReconcile(t *testing.T) {
ConditionedStatus: xpv1.ConditionedStatus{
Conditions: []xpv1.Condition{
v1alpha1.HealthCheckFail().
WithMessage(errDoHealthCheck + ": " + errors.Wrap(getObjErr, errGetHealthCheckFile).Error()),
WithMessage(errors.Wrap(errors.Wrap(lowerGetObjErr, getObjErr.Error()), errGetHealthCheckFile).Error()),
},
},
},
Expand Down
Loading
Loading