Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): dynamic read request stall timeout #10958

Merged
merged 12 commits into from
Oct 14, 2024
104 changes: 86 additions & 18 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package storage

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
Expand All @@ -27,6 +29,7 @@ import (
"time"

"cloud.google.com/go/iam/apiv1/iampb"
"cloud.google.com/go/storage/experimental"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
Expand Down Expand Up @@ -948,7 +951,6 @@ func initEmulatorClients() func() error {
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
return noopCloser
}

emulatorClients = map[string]storageClient{
"http": httpClient,
"grpc": grpcClient,
Expand Down Expand Up @@ -1335,10 +1337,14 @@ func TestObjectConditionsEmulated(t *testing.T) {
// Test that RetryNever prevents any retries from happening in both transports.
func TestRetryNeverEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
_, err := client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))
_, err = client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1354,12 +1360,16 @@ func TestRetryNeverEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until a timeout.
func TestRetryTimeoutEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1379,11 +1389,15 @@ func TestRetryTimeoutEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until max attempts.
func TestRetryMaxAttemptsEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand Down Expand Up @@ -1426,8 +1440,12 @@ func TestTimeoutErrorEmulated(t *testing.T) {
// Test that server-side DEADLINE_EXCEEDED errors are retried as expected with gRPC.
func TestRetryDeadlineExceedeEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
Expand All @@ -1436,17 +1454,61 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
})
}

// Test validates the retry for stalled read-request, when client is created with
// WithReadStallTimeout.
func TestRetryReadReqStallEmulated(t *testing.T) {
multiTransportTest(skipJSONReads(skipGRPC("not supported"), "not supported"), t, func(t *testing.T, ctx context.Context, project, _ string, client *Client) {
// Setup bucket and upload object.
bucket := fmt.Sprintf("http-bucket-%d", time.Now().Nanosecond())
if _, err := client.tc.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}, nil); err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

name, _, _, err := createObjectWithContent(ctx, bucket, randomBytes3MiB)
if err != nil {
t.Fatalf("createObject: %v", err)
}

// Plant stall at start for 2s.
instructions := map[string][]string{"storage.objects.get": {"stall-for-2s-after-0K"}}
testID := createRetryTest(t, client.tc, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

r, err := client.tc.NewRangeReader(ctx, &newRangeReaderParams{
bucket: bucket,
object: name,
gen: defaultGen,
offset: 0,
length: -1,
}, idempotent(true))
if err != nil {
t.Fatalf("NewRangeReader: %v", err)
}
defer r.Close()

buf := &bytes.Buffer{}
if _, err := io.Copy(buf, r); err != nil {
t.Fatalf("io.Copy: %v", err)
}
if !bytes.Equal(buf.Bytes(), randomBytes3MiB) {
t.Errorf("content does not match, got len %v, want len %v", buf.Len(), len(randomBytes3MiB))
}

}, experimental.WithReadStallTimeout(
&experimental.ReadStallTimeoutConfig{
TargetPercentile: 0.99,
Min: time.Second,
}))
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
func createRetryTest(t *testing.T, project, bucket string, client storageClient, instructions map[string][]string) string {
func createRetryTest(t *testing.T, client storageClient, instructions map[string][]string) string {
t.Helper()
ctx := context.Background()

_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}

// Need the HTTP hostname to set up a retry test, as well as knowledge of
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// underlying transport to specify instructions.
Expand All @@ -1470,14 +1532,20 @@ func createRetryTest(t *testing.T, project, bucket string, client storageClient,
return et.id
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
// createObject creates an object in the emulator with content randomBytesToWrite and
// returns its name, generation, and metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
return createObjectWithContent(ctx, bucket, randomBytesToWrite)
}

// createObject creates an object in the emulator with the provided []byte contents,
// and returns its name, generation, and metageneration.
func createObjectWithContent(ctx context.Context, bucket string, bytes []byte) (string, int64, int64, error) {
prefix := time.Now().Nanosecond()
objName := fmt.Sprintf("%d-object", prefix)

w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
if _, err := w.Write(randomBytesToWrite); err != nil {
if _, err := w.Write(bytes); err != nil {
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
}
if err := w.Close(); err != nil {
Expand Down
64 changes: 64 additions & 0 deletions storage/experimental/experimental.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package experimental is a collection of experimental features that might
// have some rough edges to them. Housing experimental features in this package
// results in a user accessing these APIs as `experimental.Foo`, thereby making
// it explicit that the feature is experimental and using them in production
// code is at their own risk.
//
// All APIs in this package are experimental.
package experimental

import (
"time"

"cloud.google.com/go/storage/internal"
"google.golang.org/api/option"
)

// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient].
// It enables the client to retry stalled requests when starting a download from
// Cloud Storage. If the timeout elapses with no response from the server, the request
// is automatically retried.
// The timeout is initially set to ReadStallTimeoutConfig.Min. The client tracks
// latency across all read requests from the client, and can adjust the timeout higher
// to the target percentile when latency from the server is high.
// Currently, this is supported only for downloads ([storage.NewReader] and
// [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON)
// will be supported soon.
func WithReadStallTimeout(rstc *ReadStallTimeoutConfig) option.ClientOption {
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
// Currently, dynamicTimeout is kept at the client and hence shared across all the
// BucketHandle, which is not the ideal state. As latency depends on location of VM
// and Bucket, and read latency of different buckets may lie in different range.
// Hence having a separate dynamicTimeout instance at BucketHandle level will
// be better.
return internal.WithReadStallTimeout.(func(config *ReadStallTimeoutConfig) option.ClientOption)(rstc)
}

// ReadStallTimeoutConfig defines the timeout which is adjusted dynamically based on
// past observed latencies.
type ReadStallTimeoutConfig struct {
// Min is the minimum duration of the timeout. The default value is 500ms. Requests
// taking shorter than this value to return response headers will never time out.
// In general, you should choose a Min value that is greater than the typical value
// for the target percentile.
Min time.Duration

// TargetPercentile is the percentile to target for the dynamic timeout. The default
// value is 0.99. At the default percentile, at most 1% of requests will be timed out
// and retried.
TargetPercentile float64
}
86 changes: 71 additions & 15 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
Expand All @@ -47,13 +48,14 @@ import (
// httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
// storageClient interface.
type httpStorageClient struct {
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
dynamicReadReqStallTimeout *dynamicDelay
}

// newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
Expand Down Expand Up @@ -128,14 +130,29 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl
return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
}

var dd *dynamicDelay
if config.readStallTimeoutConfig != nil {
drrstConfig := config.readStallTimeoutConfig
dd, err = newDynamicDelay(
drrstConfig.TargetPercentile,
getDynamicReadReqIncreaseRateFromEnv(),
getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min),
drrstConfig.Min,
defaultDynamicReqdReqMaxTimeout)
if err != nil {
return nil, fmt.Errorf("creating dynamic-delay: %w", err)
}
}

return &httpStorageClient{
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
dynamicReadReqStallTimeout: dd,
}, nil
}

Expand Down Expand Up @@ -858,7 +875,46 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa
reopen := readerReopen(ctx, req.Header, params, s,
func(ctx context.Context) (*http.Response, error) {
setHeadersFromCtx(ctx, req.Header)
return c.hc.Do(req.WithContext(ctx))

if c.dynamicReadReqStallTimeout == nil {
return c.hc.Do(req.WithContext(ctx))
}

cancelCtx, cancel := context.WithCancel(ctx)
var (
res *http.Response
err error
)

done := make(chan bool)
go func() {
reqStartTime := time.Now()
res, err = c.hc.Do(req.WithContext(cancelCtx))
if err == nil {
reqLatency := time.Since(reqStartTime)
c.dynamicReadReqStallTimeout.update(reqLatency)
} else if errors.Is(err, context.Canceled) {
// context.Canceled means operation took more than current dynamicTimeout,
// hence should be increased.
c.dynamicReadReqStallTimeout.increase()
}
done <- true
}()

// Wait until timeout or request is successful.
timer := time.After(c.dynamicReadReqStallTimeout.getValue())
select {
case <-timer:
log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds())
cancel()
err = context.DeadlineExceeded
if res != nil && res.Body != nil {
res.Body.Close()
}
case <-done:
cancel = nil
}
return res, err
},
func() error { return setConditionsHeaders(req.Header, params.conds) },
func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
Expand Down
2 changes: 0 additions & 2 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
itesting "google.golang.org/api/iterator/testing"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -93,7 +92,6 @@ var (
)

func TestMain(m *testing.M) {
grpc.EnableTracing = true
cleanup := initIntegrationTest()
cleanupEmulatorClients := initEmulatorClients()
exit := m.Run()
Expand Down
Loading
Loading