Skip to content

Commit

Permalink
gensupport: add retry for initial upload request (#528)
Browse files Browse the repository at this point in the history
* gensupport: add retry for initial upload request

This adds a retry for the initial request in a resumable
or multipart upload.

Fixes #392
  • Loading branch information
tritone authored Jun 16, 2020
1 parent 479f86b commit 077708b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 77 deletions.
33 changes: 18 additions & 15 deletions internal/gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,6 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, e
// rx is private to the auto-generated API code.
// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
var shouldRetry = func(status int, err error) bool {
if 500 <= status && status <= 599 {
return true
}
if status == statusTooManyRequests {
return true
}
if err == io.ErrUnexpectedEOF {
return true
}
if err, ok := err.(interface{ Temporary() bool }); ok {
return err.Temporary()
}
return false
}

// There are a couple of cases where it's possible for err and resp to both
// be non-nil. However, we expose a simpler contract to our callers: exactly
Expand Down Expand Up @@ -239,3 +224,21 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
return prepareReturn(resp, err)
}
}

// shouldRetry indicates whether an error is retryable for the purposes of this
// package.
func shouldRetry(status int, err error) bool {
if 500 <= status && status <= 599 {
return true
}
if status == statusTooManyRequests {
return true
}
if err == io.ErrUnexpectedEOF {
return true
}
if err, ok := err.(interface{ Temporary() bool }); ok {
return err.Temporary()
}
return false
}
51 changes: 0 additions & 51 deletions internal/gensupport/resumable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,57 +301,6 @@ func TestCancelUploadBasic(t *testing.T) {
}
}

func TestRetry_Bounded(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))

tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
{"bytes 0-89/*", http.StatusServiceUnavailable},
},
bodies: bodyTracker{},
}

rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
}

oldRetryDeadline := retryDeadline
retryDeadline = time.Second
defer func() { retryDeadline = oldRetryDeadline }()

oldBackoff := backoff
backoff = func() Backoff { return new(PauseForeverBackoff) }
defer func() { backoff = oldBackoff }()

resCode := make(chan int, 1)
go func() {
resp, err := rx.Upload(context.Background())
if err != nil {
t.Error(err)
return
}
resCode <- resp.StatusCode
}()

select {
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for Upload to complete")
case got := <-resCode:
if want, got := http.StatusServiceUnavailable, got; got != want {
t.Fatalf("want %d, got %d", want, got)
}
}
}

func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
const (
chunkSize = 90
Expand Down
47 changes: 41 additions & 6 deletions internal/gensupport/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"net/http"
"time"
)

// Hook is the type of a function that is called once before each HTTP request
Expand Down Expand Up @@ -64,14 +65,48 @@ func send(ctx context.Context, client *http.Client, req *http.Request) (*http.Re
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {

var resp *http.Response
var err error

// Loop to retry the request, up to the context deadline.
var pause time.Duration
bo := backoff()

for {
select {
case <-ctx.Done():
err = ctx.Err()
default:
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err == nil {
err = ctx.Err()
}
return resp, err
case <-time.After(pause):
}

resp, err = client.Do(req.WithContext(ctx))

var status int
if resp != nil {
status = resp.StatusCode
}

// Check if we can retry the request. A retry can only be done if the error
// is retryable and the request body can be re-created using GetBody (this
// will not be possible if the body was unbuffered).
if req.GetBody == nil || !shouldRetry(status, err) {
break
}
var errBody error
req.Body, errBody = req.GetBody()
if errBody != nil {
break
}

pause = bo.Pause()
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}
return resp, err
Expand Down
5 changes: 0 additions & 5 deletions internal/gensupport/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,3 @@ func (bo *NoPauseBackoff) Pause() time.Duration { return 0 }
type PauseOneSecond struct{}

func (bo *PauseOneSecond) Pause() time.Duration { return time.Second }

// PauseForeverBackoff implements backoff with infinite 1h pauses.
type PauseForeverBackoff struct{}

func (bo *PauseForeverBackoff) Pause() time.Duration { return time.Hour }

0 comments on commit 077708b

Please sign in to comment.