Skip to content

Commit

Permalink
Cleanup even if context is canceled.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostlevels committed Nov 7, 2024
1 parent d1d4795 commit 65477e5
Showing 1 changed file with 73 additions and 30 deletions.
103 changes: 73 additions & 30 deletions blob/service/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/md5"
"encoding/base64"
stdErrs "errors"
"io"
"time"

"github.com/tidepool-org/platform/blob"
blobStoreStructured "github.com/tidepool-org/platform/blob/store/structured"
Expand All @@ -19,6 +21,11 @@ import (
structureValidator "github.com/tidepool-org/platform/structure/validator"
)

const (
// arbritrary timeout value for cleanup operations such as deleting from S3 or Repo
defaultCleanupTimeout = time.Second * 5
)

type Provider interface {
BlobStructuredStore() blobStoreStructured.Store
BlobUnstructuredStore() blobStoreUnstructured.Store
Expand Down Expand Up @@ -70,31 +77,40 @@ func (c *Client) Create(ctx context.Context, userID string, content *blob.Conten
options.MediaType = content.MediaType
err = c.BlobUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options)
if err != nil {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
return nil
})
return nil, err
}

size := sizer.Size
if size > blob.SizeMaximum {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
return nil
})
return nil, request.ErrorResourceTooLarge()
}

digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.BlobUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
return nil
})
return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5"))
}

Expand Down Expand Up @@ -129,31 +145,41 @@ func (c *Client) CreateDeviceLogs(ctx context.Context, userID string, content *b
options.MediaType = content.MediaType
err = c.DeviceLogsUnstructuredStore().Put(ctx, userID, *result.ID, io.TeeReader(io.TeeReader(io.LimitReader(content.Body, blob.SizeMaximum+1), hasher), sizer), options)
if err != nil {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob after failure to put blob content")
}
return nil
})
return nil, err
}

size := sizer.Size
if size > blob.SizeMaximum {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content exceeding maximum size")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob exceeding maximum size")
}
return nil
})
return nil, request.ErrorResourceTooLarge()
}

digestMD5 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
if content.DigestMD5 != nil && *content.DigestMD5 != digestMD5 {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
doDoneCtx(ctx, defaultCleanupTimeout, func(ctx context.Context) error {
if _, deleteErr := c.DeviceLogsUnstructuredStore().Delete(ctx, userID, *result.ID); deleteErr != nil {
logger.WithError(deleteErr).Error("Unable to delete blob content with incorrect MD5 digest")
}
if _, destroyErr := repository.Destroy(ctx, *result.ID, nil); destroyErr != nil {
logger.WithError(destroyErr).Error("Unable to destroy blob with incorrect MD5 digest")
}
return nil
})

return nil, errors.WithSource(request.ErrorDigestsNotEqual(*content.DigestMD5, digestMD5), structure.NewPointerSource().WithReference("digestMD5"))
}

Expand Down Expand Up @@ -287,3 +313,20 @@ func (s *SizeWriter) Write(bites []byte) (int, error) {
s.Size += length
return length, nil
}

// doDoneCtx performs an action given a context even if the context is
// canceled or timed out. This is used if we have any cleanup functions that we
// still want to perform and passing the parent context would time out any
// child contexts.
func doDoneCtx(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error {
newContext := ctx
var cancel context.CancelFunc
select {
case <-ctx.Done():
if stdErrs.Is(ctx.Err(), context.Canceled) || stdErrs.Is(ctx.Err(), context.DeadlineExceeded) {
newContext, cancel = context.WithTimeout(context.WithoutCancel(ctx), timeout)
defer cancel()
}
}
return fn(newContext)
}

0 comments on commit 65477e5

Please sign in to comment.