Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Apply retries around the whole download operation #44

Merged
merged 26 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
40c5c27
Use the File's name throughout
TomBoam Jan 20, 2020
fd671b2
Use the term 'destination' to match name in store function
TomBoam Jan 20, 2020
2c3fd8d
Improve error handling by not ignoring the error when creating directory
TomBoam Jan 20, 2020
d219cb4
Extract method for creating file
TomBoam Jan 20, 2020
a42d051
Move file handling up into process function
TomBoam Jan 20, 2020
269d193
Switch the order of creating the file and getting it from S3.
TomBoam Jan 20, 2020
35cd956
Extract function for copying S3Object to a local File
TomBoam Jan 20, 2020
e384126
Convert readHEAD function to copy the contents to a temp file on disk…
TomBoam Jan 20, 2020
539d8ab
Extract function that does the work inside the retry loop
TomBoam Jan 20, 2020
212a6dc
Pass the file object down to where we will need it
TomBoam Jan 20, 2020
8e30471
Copy to file inside the retry loop
TomBoam Jan 20, 2020
a3da887
Remove some unnecessary code
TomBoam Jan 20, 2020
099d114
Reset the file if copying fails.
TomBoam Jan 20, 2020
dc3c417
Small tidies
TomBoam Jan 20, 2020
ce72f1e
Inline getObject method
TomBoam Jan 20, 2020
187df07
Handle errors resetting the file between retries
TomBoam Jan 20, 2020
9bd2bc1
Fix logging problem
TomBoam Jan 20, 2020
2c79da0
Use an S3Getter interface as a structural type to make testing easier
TomBoam Jan 21, 2020
df68a8a
Start building test
TomBoam Jan 21, 2020
85dd676
First test coming along
TomBoam Jan 21, 2020
187e18b
Test the happy case
TomBoam Jan 21, 2020
ddd7be8
Suite of tests to check various failure cases.
TomBoam Jan 21, 2020
c1ddd73
Make the time between retries configurable so I can lower it during t…
TomBoam Jan 22, 2020
3f4411d
Tidy up and fix consistency
TomBoam Jan 22, 2020
22050ad
Fix formatting
TomBoam Jan 22, 2020
bc198ef
Change error message to better reflect the failure case
TomBoam Jan 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 73 additions & 30 deletions cli/data/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package data

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
Expand All @@ -40,10 +40,11 @@ var (
getSubdir string
)

var s3RetriesSleep = 10 * time.Second

const (
s3ParallelGets = 100
s3Retries = 10
s3RetriesSleep = 10 * time.Second
)

var getCmd = &cobra.Command{
Expand Down Expand Up @@ -111,17 +112,26 @@ func copyPathToDestination(source S3Path, destination string, keys []string, sub
}

func readHEAD(session *session.Session, source S3Path) string {
svc := s3.New(session)
tempFile, err := ioutil.TempFile("", "HEAD")
if err != nil {
exitErrorf("Unable to create temp file: %v", err)
}

out, err := getObject(svc, aws.String(source.bucket), aws.String(source.path))
defer os.Remove(tempFile.Name())

svc := s3.New(session)

err = copyS3ObjectToFile(svc, source, source.path, tempFile)
if err != nil {
exitErrorf("Error reading HEAD: %v", err)
TomBoam marked this conversation as resolved.
Show resolved Hide resolved
}

buf := new(bytes.Buffer)
buf.ReadFrom(out.Body)
return buf.String()
contents, err := ioutil.ReadFile(tempFile.Name())
if err != nil {
exitErrorf("Error reading HEAD file: %v", err)
}

return string(contents)
}

func parseDestination(destination string, subdir string) string {
Expand Down Expand Up @@ -219,58 +229,91 @@ func process(s3Client *s3.S3, src S3Path, basePath string, filePath string, sem
return
}

out, err := getObject(s3Client, aws.String(src.bucket), &filePath)
destination := basePath + "/" + strings.TrimPrefix(filePath, src.Dirname()+"/")
file, err := createFile(destination)
if err != nil {
exitErrorf("%v", err)
}
defer out.Body.Close()

target := basePath + "/" + strings.TrimPrefix(filePath, src.Dirname()+"/")
err = store(out, target)
defer file.Close()

err = copyS3ObjectToFile(s3Client, src, filePath, file)
if err != nil {
exitErrorf("%v", err)
}
}

func getObject(s3Client *s3.S3, bucket *string, key *string) (*s3.GetObjectOutput, error) {
var (
err error
out *s3.GetObjectOutput
)
type S3Getter interface {
GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
}

func copyS3ObjectToFile(s3Client S3Getter, src S3Path, filePath string, file *os.File) error {
var err error

retries := s3Retries
for retries > 0 {
out, err = s3Client.GetObject(&s3.GetObjectInput{
Bucket: bucket,
Key: key,
})
err = tryGetObject(s3Client, aws.String(src.bucket), &filePath, file)
if err == nil {
return out, nil
// we're done
return nil
}

resetErr := resetFileForWriting(file)
if resetErr != nil {
fmt.Printf("Unable to download object from S3 (%s) and unable reset temp file to try again (%s)",
err,
resetErr)
return errors.Wrapf(resetErr, "unable to reset temp file %s", file.Name())
}
retries--
if retries > 0 {
fmt.Printf("Error fetching from S3: %s, (%s); will retry in %v... \n", *key, err.Error(), s3RetriesSleep)
fmt.Printf("Error fetching from S3: %s, (%s); will retry in %v... \n", filePath, err.Error(), s3RetriesSleep)
time.Sleep(s3RetriesSleep)
}
}
return nil, err
return err
}

func store(obj *s3.GetObjectOutput, destination string) error {
err := os.MkdirAll(filepath.Dir(destination), 0777)
func resetFileForWriting(file *os.File) error {
err := file.Truncate(0)
_, err = file.Seek(0, 0)
return err
}

func tryGetObject(s3Client S3Getter, bucket *string, key *string, file *os.File) error {
out, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: bucket,
Key: key,
})

file, err := os.Create(destination)
if err != nil {
return errors.Wrapf(err, "creating destination %s", destination)
return err
}
defer file.Close()

defer out.Body.Close()

return storeS3ObjectToFile(out, file)
}

func storeS3ObjectToFile(obj *s3.GetObjectOutput, file *os.File) error {
bytes, err := io.Copy(file, obj.Body)
if err != nil {
return errors.Wrapf(err, "copying file %s", destination)
return errors.Wrapf(err, "copying file %s", file.Name())
}

fmt.Printf("%s -> %d bytes\n", destination, bytes)
fmt.Printf("%s -> %d bytes\n", file.Name(), bytes)
return nil
}

func createFile(destination string) (*os.File, error) {
err := os.MkdirAll(filepath.Dir(destination), 0777)
if err != nil {
return nil, errors.Wrapf(err, "creating directory %s", filepath.Dir(destination))
}

file, err := os.Create(destination)
if err != nil {
return nil, errors.Wrapf(err, "creating destination %s", destination)
}
return file, nil
}
169 changes: 168 additions & 1 deletion cli/data/get_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package data

import (
"errors"
"github.com/aws/aws-sdk-go/service/s3"
"io"
"io/ioutil"
"strings"
"testing"
"time"
)

func TestFilterObjects(t *testing.T) {
Expand Down Expand Up @@ -47,7 +52,7 @@ func TestFilterObjectsWithNoKeys(t *testing.T) {

func TestFilterObjectsUsingNonExistentKeys(t *testing.T) {
var (
key = "path/f1.csv"
key = "path/f1.csv"
obj = &s3.Object{Key: &key}
s3Path = S3Path{bucket: "bucket", path: "path/"}
keys = []string{"f2.csv", "f3.csv"}
Expand All @@ -62,3 +67,165 @@ func TestFilterObjectsUsingNonExistentKeys(t *testing.T) {
t.Error("It should return an error")
}
}

type s3GetterFromString struct {
s string
}

func (s3FromString s3GetterFromString) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
out := s3.GetObjectOutput{
Body: ioutil.NopCloser(strings.NewReader(s3FromString.s)),
}
return &out, nil
}

func Test_copyS3ObjectToFile_worksFirstTime(t *testing.T) {
var s3Client S3Getter = s3GetterFromString{"foobar"}

s3Path := S3Path{bucket: "bucket", path: "path/"}
filePath := "foo/bar"
tempFile, _ := ioutil.TempFile("", "testDownload")

err := copyS3ObjectToFile(s3Client, s3Path, filePath, tempFile)
if err != nil {
t.Errorf("Should have downloaded file successfully but didn't: %v", err)
}

bytes, err := ioutil.ReadFile(tempFile.Name())
if err != nil {
t.Errorf("Should be able to read from 'downloaded' file but couldn't %v", err)
}

if string(bytes) != "foobar" {
t.Errorf("File contents were incorrect. Expected '%s' but got '%s'", "foobar", string(bytes))
}
}

type s3FailingGetter struct {
}

func (s3FailingGetter *s3FailingGetter) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
return nil, errors.New("can't connect to S3")
}

func Test_copyS3ObjectToFile_failsToGetObjectFromS3(t *testing.T) {
var s3Client S3Getter = &s3FailingGetter{}
s3RetriesSleep = 1 * time.Second

s3Path := S3Path{bucket: "bucket", path: "path/"}
filePath := "foo/bar"
tempFile, _ := ioutil.TempFile("", "testDownload")

err := copyS3ObjectToFile(s3Client, s3Path, filePath, tempFile)
if err == nil {
t.Errorf("Shouldn't have been able to download file successfully but did")
}
}

type s3FailingReader struct {
}

func (s3FailingReader *s3FailingReader) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
out := s3.GetObjectOutput{
Body: ioutil.NopCloser(&failingReader{}),
}
return &out, nil
}

type failingReader struct {
}

func (r *failingReader) Read(p []byte) (int, error) {
return 0, errors.New("failing reader")
}

func Test_copyS3ObjectToFile_failsToReadFromS3(t *testing.T) {
var s3Client S3Getter = &s3FailingReader{}
s3RetriesSleep = 1 * time.Second

s3Path := S3Path{bucket: "bucket", path: "path/"}
filePath := "foo/bar"
tempFile, _ := ioutil.TempFile("", "testDownload")

err := copyS3ObjectToFile(s3Client, s3Path, filePath, tempFile)
if err == nil {
t.Errorf("Shouldn't have been able to download file successfully but did")
}
}

type s3GetterFailOnClose struct {
s string
}

func (s3GetterFailOnClose *s3GetterFailOnClose) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
out := s3.GetObjectOutput{
Body: failOnClose{strings.NewReader(s3GetterFailOnClose.s)},
}
return &out, nil
}

type failOnClose struct {
io.Reader
}

func (failOnClose) Close() error {
return errors.New("failed while closing")
}

func Test_copyS3ObjectToFile_failsWhenClosingStream(t *testing.T) {
var s3Client S3Getter = &s3FailingReader{}
s3RetriesSleep = 1 * time.Second

s3Path := S3Path{bucket: "bucket", path: "path/"}
filePath := "foo/bar"
tempFile, _ := ioutil.TempFile("", "testDownload")

err := copyS3ObjectToFile(s3Client, s3Path, filePath, tempFile)
if err == nil {
t.Errorf("Shouldn't have been able to download file successfully but did")
}
}

type s3GetterFailsFirstFewAttempts struct {
unsuccessfulReads int
s string
}

func (s3GetterFailsFirstFewAttempts *s3GetterFailsFirstFewAttempts) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
var out s3.GetObjectOutput
if s3GetterFailsFirstFewAttempts.unsuccessfulReads == 0 {
out = s3.GetObjectOutput{
Body: ioutil.NopCloser(strings.NewReader(s3GetterFailsFirstFewAttempts.s)),
}
} else {
s3GetterFailsFirstFewAttempts.unsuccessfulReads--
out = s3.GetObjectOutput{
Body: ioutil.NopCloser(&failingReader{}),
}
}

return &out, nil
}

func Test_copyS3ObjectToFile_failsFirstFewReadAttemptsButRetries(t *testing.T) {
var s3Client S3Getter = &s3GetterFailsFirstFewAttempts{5, "foobar"}
s3RetriesSleep = 1 * time.Second

s3Path := S3Path{bucket: "bucket", path: "path/"}
filePath := "foo/bar"
tempFile, _ := ioutil.TempFile("", "testDownload")

err := copyS3ObjectToFile(s3Client, s3Path, filePath, tempFile)
if err != nil {
t.Errorf("Should have downloaded file successfully but didn't: %v", err)
}

bytes, err := ioutil.ReadFile(tempFile.Name())
if err != nil {
t.Errorf("Should be able to read from 'downloaded' file but couldn't %v", err)
}

if string(bytes) != "foobar" {
t.Errorf("File contents were incorrect. Expected '%s' but got '%s'", "foobar", string(bytes))
}
}