Skip to content

Commit

Permalink
404 handling and DeleteObject implementations (GCS & S3) (grafana#2260)
Browse files Browse the repository at this point in the history
* add proper 404 handling and delete implementations to object client implementations

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>

* fix typo

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>
  • Loading branch information
jtlisi authored Mar 12, 2020
1 parent a2ba001 commit 21516ad
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
30 changes: 26 additions & 4 deletions aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
Expand Down Expand Up @@ -90,9 +91,23 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
// Stop fulfills the chunk.ObjectClient interface
func (a *S3ObjectClient) Stop() {}

func (a *S3ObjectClient) DeleteObject(ctx context.Context, chunkID string) error {
// ToDo: implement this to support deleting chunks from S3
return chunk.ErrMethodNotImplemented
// DeleteObject deletes the specified objectKey from the appropriate S3 bucket
func (a *S3ObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
_, err := a.S3.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == s3.ErrCodeNoSuchKey {
return chunk.ErrStorageObjectNotFound
}
}
return err
}

return nil
}

// bucketFromKey maps a key to a bucket name
Expand All @@ -108,7 +123,8 @@ func (a *S3ObjectClient) bucketFromKey(key string) string {
return a.bucketNames[hash%uint32(len(a.bucketNames))]
}

// Get object from the store
// GetObject returns a reader for the specified object key from the configured S3 bucket. If the
// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned.
func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
var resp *s3.GetObjectOutput

Expand All @@ -123,7 +139,13 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
})
return err
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, chunk.ErrStorageObjectNotFound
}
}
return nil, err
}

Expand Down
32 changes: 26 additions & 6 deletions gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func (s *GCSObjectClient) Stop() {
s.client.Close()
}

// Get object from the store
// GetObject returns a reader for the specified object key from the configured GCS bucket. If the
// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned.
func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
if s.cfg.RequestTimeout > 0 {
// The context will be cancelled with the timeout or when the parent context is cancelled, whichever occurs first.
Expand All @@ -73,10 +74,19 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
defer cancel()
}

return s.bucket.Object(objectKey).NewReader(ctx)
reader, err := s.bucket.Object(objectKey).NewReader(ctx)

if err != nil {
if err == storage.ErrObjectNotExist {
return nil, chunk.ErrStorageObjectNotFound
}
return nil, err
}

return reader, nil
}

// Put object into the store
// PutObject puts the specified bytes into the configured GCS bucket at the provided key
func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
writer := s.bucket.Object(objectKey).NewWriter(ctx)
// Default GCSChunkSize is 8M and for each call, 8M is allocated xD
Expand Down Expand Up @@ -128,7 +138,17 @@ func (s *GCSObjectClient) List(ctx context.Context, prefix string) ([]chunk.Stor
return storageObjects, nil
}

func (s *GCSObjectClient) DeleteObject(ctx context.Context, chunkID string) error {
// ToDo: implement this to support deleting chunks from GCS
return chunk.ErrMethodNotImplemented
// DeleteObject deletes the specified object key from the configured GCS bucket. If the
// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned.
func (s *GCSObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
err := s.bucket.Object(objectKey).Delete(ctx)

if err != nil {
if err == storage.ErrObjectNotExist {
return chunk.ErrStorageObjectNotFound
}
return err
}

return nil
}

0 comments on commit 21516ad

Please sign in to comment.