Skip to content

Commit

Permalink
adding cleanup function to batch objects (#1375)
Browse files Browse the repository at this point in the history
  • Loading branch information
xibz authored Jun 30, 2017
1 parent 7238b75 commit c51a820
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 60 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
### SDK Features

### SDK Enhancements
* `service/s3/s3manager`: adding cleanup function to batch objects [#1375](https://github.com/aws/aws-sdk-go/issues/1375)
* This enhancement will add an After field that will be called after each iteration of the batch operation.

### SDK Bugs
40 changes: 30 additions & 10 deletions service/s3/s3manager/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package s3manager

import (
"bytes"
"fmt"
"io"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -43,8 +44,16 @@ type Error struct {
Key *string
}

func newError(err error, bucket, key *string) Error {
return Error{
err,
bucket,
key,
}
}

func (err *Error) Error() string {
return err.OrigErr.Error()
return fmt.Sprintf("failed to upload %q to %q:\n%s", err.Key, err.Bucket, err.OrigErr.Error())
}

// NewBatchError will return a BatchError that satisfies the awserr.Error interface.
Expand Down Expand Up @@ -239,6 +248,9 @@ func NewBatchDelete(c client.ConfigProvider, options ...func(*BatchDelete)) *Bat
// BatchDeleteObject is a wrapper object for calling the batch delete operation.
type BatchDeleteObject struct {
Object *s3.DeleteObjectInput
// After will run after each iteration during the batch process. This function will
// be executed whether or not the request was successful.
After func() error
}

// DeleteObjectsIterator is an interface that uses the scanner pattern to iterate
Expand Down Expand Up @@ -277,15 +289,17 @@ func (iter *DeleteObjectsIterator) DeleteObject() BatchDeleteObject {
func (d *BatchDelete) Delete(ctx aws.Context, iter BatchDeleteIterator) error {
var errs []Error
for iter.Next() {
object := iter.DeleteObject().Object
if _, err := d.Client.DeleteObjectWithContext(ctx, object); err != nil {
s3Err := Error{
OrigErr: err,
Bucket: object.Bucket,
Key: object.Key,
}

errs = append(errs, s3Err)
object := iter.DeleteObject()
if _, err := d.Client.DeleteObjectWithContext(ctx, object.Object); err != nil {
errs = append(errs, newError(err, object.Object.Bucket, object.Object.Key))
}

if object.After == nil {
continue
}

if err := object.After(); err != nil {
errs = append(errs, newError(err, object.Object.Bucket, object.Object.Key))
}
}

Expand All @@ -307,6 +321,9 @@ type BatchDownloadIterator interface {
type BatchDownloadObject struct {
Object *s3.GetObjectInput
Writer io.WriterAt
// After will run after each iteration during the batch process. This function will
// be executed whether or not the request was successful.
After func() error
}

// DownloadObjectsIterator implements the BatchDownloadIterator interface and allows for batched
Expand Down Expand Up @@ -382,4 +399,7 @@ func (batcher *UploadObjectsIterator) UploadObject() BatchUploadObject {
// BatchUploadObject contains all necessary information to run a batch operation once.
type BatchUploadObject struct {
Object *UploadInput
// After will run after each iteration during the batch process. This function will
// be executed whether or not the request was successful.
After func() error
}
249 changes: 209 additions & 40 deletions service/s3/s3manager/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ func TestBatchUpload(t *testing.T) {

type mockClient struct {
s3iface.S3API
index int
Put func() (*s3.PutObjectOutput, error)
Get func() (*s3.GetObjectOutput, error)
List func() (*s3.ListObjectsOutput, error)
responses []response
}

Expand All @@ -457,37 +459,25 @@ type response struct {
}

func (client *mockClient) PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
resp := client.responses[client.index]
client.index++
return resp.out.(*s3.PutObjectOutput), resp.err
return client.Put()
}

func (client *mockClient) PutObjectRequest(input *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) {
resp := client.responses[client.index]
req, _ := client.S3API.PutObjectRequest(input)
req.Handlers.Clear()
req.Data = resp.out
req.Error = resp.err

client.index++
return req, resp.out.(*s3.PutObjectOutput)
req.Data, req.Error = client.Put()
return req, req.Data.(*s3.PutObjectOutput)
}

func (client *mockClient) ListObjects(input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
resp := client.responses[client.index]
client.index++
return resp.out.(*s3.ListObjectsOutput), resp.err
return client.List()
}

func (client *mockClient) ListObjectsRequest(input *s3.ListObjectsInput) (*request.Request, *s3.ListObjectsOutput) {
resp := client.responses[client.index]
req, _ := client.S3API.ListObjectsRequest(input)
req.Handlers.Clear()
req.Data = resp.out
req.Error = resp.err

client.index++
return req, resp.out.(*s3.ListObjectsOutput)
req.Data, req.Error = client.List()
return req, req.Data.(*s3.ListObjectsOutput)
}

func TestBatchError(t *testing.T) {
Expand All @@ -500,26 +490,38 @@ func TestBatchError(t *testing.T) {
Region: aws.String("foo"),
Credentials: credentials.NewStaticCredentials("AKID", "SECRET", "SESSION"),
})

index := 0
responses := []response{
{
&s3.PutObjectOutput{},
errors.New("Foo"),
},
{
&s3.PutObjectOutput{},
nil,
},
{
&s3.PutObjectOutput{},
nil,
},
{
&s3.PutObjectOutput{},
errors.New("Bar"),
},
}

svc := &mockClient{
s3.New(sess),
0,
[]response{
{
&s3.PutObjectOutput{},
errors.New("Foo"),
},
{
&s3.PutObjectOutput{},
nil,
},
{
&s3.PutObjectOutput{},
nil,
},
{
&s3.PutObjectOutput{},
errors.New("Bar"),
},
S3API: s3.New(sess),
Put: func() (*s3.PutObjectOutput, error) {
resp := responses[index]
index++
return resp.out.(*s3.PutObjectOutput), resp.err
},
List: func() (*s3.ListObjectsOutput, error) {
resp := responses[index]
index++
return resp.out.(*s3.ListObjectsOutput), resp.err
},
}
uploader := NewUploaderWithClient(svc)
Expand Down Expand Up @@ -590,8 +592,175 @@ func TestBatchError(t *testing.T) {
t.Error("Expected error, but received nil")
}

if svc.index != len(objects) {
t.Errorf("Expected %d, but received %d", len(objects), svc.index)
if index != len(objects) {
t.Errorf("Expected %d, but received %d", len(objects), index)
}

}

type testAfterDeleteIter struct {
afterDelete bool
afterDownload bool
afterUpload bool
next bool
}

func (iter *testAfterDeleteIter) Next() bool {
next := !iter.next
iter.next = !iter.next
return next
}

func (iter *testAfterDeleteIter) Err() error {
return nil
}

func (iter *testAfterDeleteIter) DeleteObject() BatchDeleteObject {
return BatchDeleteObject{
Object: &s3.DeleteObjectInput{
Bucket: aws.String("foo"),
Key: aws.String("foo"),
},
After: func() error {
iter.afterDelete = true
return nil
},
}
}

type testAfterDownloadIter struct {
afterDownload bool
afterUpload bool
next bool
}

func (iter *testAfterDownloadIter) Next() bool {
next := !iter.next
iter.next = !iter.next
return next
}

func (iter *testAfterDownloadIter) Err() error {
return nil
}

func (iter *testAfterDownloadIter) DownloadObject() BatchDownloadObject {
return BatchDownloadObject{
Object: &s3.GetObjectInput{
Bucket: aws.String("foo"),
Key: aws.String("foo"),
},
Writer: aws.NewWriteAtBuffer([]byte{}),
After: func() error {
iter.afterDownload = true
return nil
},
}
}

type testAfterUploadIter struct {
afterUpload bool
next bool
}

func (iter *testAfterUploadIter) Next() bool {
next := !iter.next
iter.next = !iter.next
return next
}

func (iter *testAfterUploadIter) Err() error {
return nil
}

func (iter *testAfterUploadIter) UploadObject() BatchUploadObject {
return BatchUploadObject{
Object: &UploadInput{
Bucket: aws.String("foo"),
Key: aws.String("foo"),
Body: strings.NewReader("bar"),
},
After: func() error {
iter.afterUpload = true
return nil
},
}
}

func TestAfter(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))

sess := session.New(&aws.Config{
Endpoint: &server.URL,
S3ForcePathStyle: aws.Bool(true),
Region: aws.String("foo"),
Credentials: credentials.NewStaticCredentials("AKID", "SECRET", "SESSION"),
})

index := 0
responses := []response{
{
&s3.PutObjectOutput{},
nil,
},
{
&s3.GetObjectOutput{},
nil,
},
{
&s3.DeleteObjectOutput{},
nil,
},
}

svc := &mockClient{
S3API: s3.New(sess),
Put: func() (*s3.PutObjectOutput, error) {
resp := responses[index]
index++
return resp.out.(*s3.PutObjectOutput), resp.err
},
Get: func() (*s3.GetObjectOutput, error) {
resp := responses[index]
index++
return resp.out.(*s3.GetObjectOutput), resp.err
},
List: func() (*s3.ListObjectsOutput, error) {
resp := responses[index]
index++
return resp.out.(*s3.ListObjectsOutput), resp.err
},
}
uploader := NewUploaderWithClient(svc)
downloader := NewDownloaderWithClient(svc)
deleter := NewBatchDeleteWithClient(svc)

deleteIter := &testAfterDeleteIter{}
downloadIter := &testAfterDownloadIter{}
uploadIter := &testAfterUploadIter{}

if err := uploader.UploadWithIterator(aws.BackgroundContext(), uploadIter); err != nil {
t.Error(err)
}

if err := downloader.DownloadWithIterator(aws.BackgroundContext(), downloadIter); err != nil {
t.Error(err)
}

if err := deleter.Delete(aws.BackgroundContext(), deleteIter); err != nil {
t.Error(err)
}

if !deleteIter.afterDelete {
t.Error("Expected 'afterDelete' to be true, but received false")
}

if !downloadIter.afterDownload {
t.Error("Expected 'afterDownload' to be true, but received false")
}

if !uploadIter.afterUpload {
t.Error("Expected 'afterUpload' to be true, but received false")
}
}
Loading

0 comments on commit c51a820

Please sign in to comment.