Skip to content

Commit

Permalink
storage/swift: better retries
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Buchanan committed May 21, 2018
1 parent d8f8b15 commit 59a713c
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 49 deletions.
18 changes: 15 additions & 3 deletions cmd/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,23 @@ import (
"github.com/golang/protobuf/jsonpb"
cmdutil "github.com/ohsu-comp-bio/funnel/cmd/util"
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/storage"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/spf13/cobra"
)

var log = logger.NewLogger("storage", logger.DefaultConfig())

func newStorage(conf config.Config) (storage.Storage, error) {
store, err := storage.NewMux(conf)
if err != nil {
return nil, err
}
store.AttachLogger(log)
return store, nil
}

// NewCommand returns the "storage" subcommands.
func NewCommand() *cobra.Command {

Expand Down Expand Up @@ -46,7 +58,7 @@ func NewCommand() *cobra.Command {
return cmd.Usage()
}

store, err := storage.NewMux(conf)
store, err := newStorage(conf)
if err != nil {
return fmt.Errorf("creating storage clients: %s", err)
}
Expand All @@ -73,7 +85,7 @@ func NewCommand() *cobra.Command {
return cmd.Usage()
}

store, err := storage.NewMux(conf)
store, err := newStorage(conf)
if err != nil {
return fmt.Errorf("creating storage clients: %s", err)
}
Expand Down Expand Up @@ -131,7 +143,7 @@ func NewCommand() *cobra.Command {
return cmd.Usage()
}

store, err := storage.NewMux(conf)
store, err := newStorage(conf)
if err != nil {
return fmt.Errorf("creating storage clients: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*wo
if err != nil {
return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err)
}
store.AttachLogger(log)

w := &worker.DefaultWorker{
Conf: conf.Worker,
Expand Down
2 changes: 1 addition & 1 deletion config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func DefaultConfig() Config {
},
},
Swift: SwiftStorage{
MaxRetries: 3,
MaxRetries: 20,
ChunkSizeBytes: int64(500 * units.MB),
},
}
Expand Down
14 changes: 14 additions & 0 deletions storage/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/logger"
)

// operation codes help multiplex storage operations across multiple backends.
Expand Down Expand Up @@ -167,6 +169,18 @@ func (mux *Mux) UnsupportedOperations(url string) UnsupportedOperations {
return unsupported
}

// AttachLogger will log information (such as retry warnings)
// to the given logger.
func (mux *Mux) AttachLogger(log *logger.Logger) {
for _, b := range mux.Backends {
if r, ok := b.(*Retrier); ok {
r.Retrier.Notify = func(err error, sleep time.Duration) {
log.Warn("Retrying", "error", err, "sleep", sleep)
}
}
}
}

func (mux *Mux) findBackend(url string, op operation) (Storage, error) {
var found = 0
var useBackend Storage
Expand Down
97 changes: 53 additions & 44 deletions storage/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,34 @@ func NewSwiftRetrier(conf config.SwiftStorage) (*Retrier, error) {
Backend: b,
Retrier: &util.Retrier{
MaxTries: conf.MaxRetries,
InitialInterval: time.Second * 5,
MaxInterval: time.Minute * 5,
InitialInterval: 500 * time.Millisecond,
MaxInterval: 5 * time.Minute,
Multiplier: 2.0,
RandomizationFactor: 0.5,
MaxElapsedTime: 0,
ShouldRetry: func(err error) bool {
// Retry on errors that swift names specifically.
if err == swift.ObjectCorrupted || err == swift.TimeoutError {
return true
}
// Retry on service unavailable.
if se, ok := err.(*swift.Error); ok {
return se.StatusCode == http.StatusServiceUnavailable
}
return false
},
ShouldRetry: shouldRetry,
},
}, nil
}

func shouldRetry(err error) bool {
serr, ok := err.(*swiftError)
if !ok {
return false
}
err = serr.err

// Retry on errors that swift names specifically.
if err == swift.ObjectCorrupted || err == swift.TimeoutError {
return true
}
// Retry on service unavailable.
if se, ok := err.(*swift.Error); ok {
return se.StatusCode == http.StatusServiceUnavailable
}
return false
}

// Stat returns metadata about the given url, such as checksum.
func (sw *Swift) Stat(ctx context.Context, url string) (*Object, error) {
u, err := sw.parse(url)
Expand All @@ -96,7 +104,7 @@ func (sw *Swift) Stat(ctx context.Context, url string) (*Object, error) {

info, _, err := sw.conn.Object(u.bucket, u.path)
if err != nil {
return nil, fmt.Errorf("getting object info: %s", err)
return nil, &swiftError{"getting object info", url, err}
}
return &Object{
URL: url,
Expand All @@ -118,7 +126,7 @@ func (sw *Swift) List(ctx context.Context, url string) ([]*Object, error) {
Prefix: u.path,
})
if err != nil {
return nil, fmt.Errorf("listing objects by prefix: %s", err)
return nil, &swiftError{"listing objects by prefix", url, err}
}

var objects []*Object
Expand All @@ -135,7 +143,7 @@ func (sw *Swift) List(ctx context.Context, url string) ([]*Object, error) {
}

// Get copies an object from storage to the host path.
func (sw *Swift) Get(ctx context.Context, url, path string) (obj *Object, err error) {
func (sw *Swift) Get(ctx context.Context, url, path string) (*Object, error) {
u, err := sw.parse(url)
if err != nil {
return nil, err
Expand All @@ -144,41 +152,33 @@ func (sw *Swift) Get(ctx context.Context, url, path string) (obj *Object, err er
var checkHash = true
var headers swift.Headers

obj, err = sw.Stat(ctx, url)
obj, err := sw.Stat(ctx, url)
if err != nil {
return
return nil, err
}

f, _, err := sw.conn.ObjectOpen(u.bucket, u.path, checkHash, headers)
if err != nil {
err = fmt.Errorf("initiating download: %s", err)
return
return nil, &swiftError{"initiating download", url, err}
}
defer func() {
cerr := f.Close()
if cerr != nil {
err = fmt.Errorf("closing file %v; %v", err, cerr)
}
}()
defer f.Close()

dest, err := os.Create(path)
if err != nil {
err = fmt.Errorf("creating file: %s", err)
return nil, &swiftError{"creating file", url, err}
}
defer func() {
cerr := dest.Close()
if cerr != nil {
err = fmt.Errorf("%v; %v", err, cerr)
}
}()

_, err = io.Copy(dest, fsutil.Reader(ctx, f))
if err != nil {
err = fmt.Errorf("copying file: %s", err)
return

_, copyErr := io.Copy(dest, fsutil.Reader(ctx, f))
closeErr := dest.Close()

if copyErr != nil {
return nil, &swiftError{"copying file", url, closeErr}
}
if closeErr != nil {
return nil, &swiftError{"closing file", url, closeErr}
}

return
return obj, nil
}

// Put copies an object (file) from the host path to storage.
Expand All @@ -191,7 +191,7 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) {

reader, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("opening host file %q: %s", path, err)
return nil, &swiftError{"opening host file", url, err}
}
defer reader.Close()

Expand All @@ -216,7 +216,7 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) {
}

if err != nil {
return nil, fmt.Errorf("creating object: %s", err)
return nil, &swiftError{"creating object", url, err}
}

_, copyErr := io.Copy(writer, fsutil.Reader(ctx, reader))
Expand All @@ -225,10 +225,10 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) {
closeErr := writer.Close()

if copyErr != nil {
return nil, fmt.Errorf("copying file: %s", copyErr)
return nil, &swiftError{"copying file", url, closeErr}
}
if closeErr != nil {
return nil, fmt.Errorf("closing file: %s", closeErr)
return nil, &swiftError{"closing file", url, closeErr}
}

return sw.Stat(ctx, url)
Expand All @@ -248,7 +248,7 @@ func (sw *Swift) UnsupportedOperations(url string) UnsupportedOperations {
}
_, _, err = sw.conn.Container(u.bucket)
if err != nil {
return AllUnsupported(fmt.Errorf("swift: failed to find bucket: %s. error: %v", u.bucket, err))
return AllUnsupported(&swiftError{"looking for bucket", url, err})
}
return AllSupported()
}
Expand All @@ -274,3 +274,12 @@ func (sw *Swift) parse(rawurl string) (*urlparts, error) {
}
return url, nil
}

type swiftError struct {
msg, url string
err error
}

func (s *swiftError) Error() string {
return fmt.Sprintf("swift: %s for URL %q: %s", s.msg, s.url, s.err)
}
9 changes: 8 additions & 1 deletion util/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Retrier struct {
MaxElapsedTime time.Duration
MaxTries int
ShouldRetry func(err error) bool
Notify func(err error, d time.Duration)
backoff backoff.BackOff
}

Expand All @@ -36,7 +37,13 @@ func NewRetrier() *Retrier {
// Retry the function f until it does not return error or BackOff stops.
func (r *Retrier) Retry(ctx context.Context, f func() error) error {
b := backoff.WithContext(r.withTries(), ctx)
return backoff.Retry(func() error { return r.checkErr(f()) }, b)
return backoff.RetryNotify(func() error { return r.checkErr(f()) }, b, r.notify)
}

func (r *Retrier) notify(err error, d time.Duration) {
if r.Notify != nil {
r.Notify(err, d)
}
}

func (r *Retrier) checkErr(err error) error {
Expand Down

0 comments on commit 59a713c

Please sign in to comment.