Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): dynamic read request stall timeout #10958

Merged
merged 12 commits into from
Oct 14, 2024
103 changes: 85 additions & 18 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package storage

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
Expand Down Expand Up @@ -948,7 +950,6 @@ func initEmulatorClients() func() error {
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
return noopCloser
}

emulatorClients = map[string]storageClient{
"http": httpClient,
"grpc": grpcClient,
Expand Down Expand Up @@ -1335,10 +1336,14 @@ func TestObjectConditionsEmulated(t *testing.T) {
// Test that RetryNever prevents any retries from happening in both transports.
func TestRetryNeverEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
_, err := client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))
_, err = client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1354,12 +1359,16 @@ func TestRetryNeverEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until a timeout.
func TestRetryTimeoutEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1379,11 +1388,15 @@ func TestRetryTimeoutEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until max attempts.
func TestRetryMaxAttemptsEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand Down Expand Up @@ -1426,8 +1439,12 @@ func TestTimeoutErrorEmulated(t *testing.T) {
// Test that server-side DEADLINE_EXCEEDED errors are retried as expected with gRPC.
func TestRetryDeadlineExceedeEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
Expand All @@ -1436,17 +1453,61 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
})
}

// Test validates the retry for stalled read-request, when client is created with
// WithDynamicReadReqStallTimeout.
func TestRetryReadReqStallEmulated(t *testing.T) {
multiTransportTest(skipJSONReads(skipGRPC("not supported"), "not supported"), t, func(t *testing.T, ctx context.Context, project, _ string, client *Client) {
// Setup bucket and upload object.
bucket := fmt.Sprintf("http-bucket-%d", time.Now().Nanosecond())
if _, err := client.tc.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}, nil); err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

name, _, _, err := createObjectWithContent(ctx, bucket, randomBytes3MiB)
if err != nil {
t.Fatalf("createObject: %v", err)
}

// Plant stall at start for 2s.
instructions := map[string][]string{"storage.objects.get": {"stall-for-2s-after-0K"}}
testID := createRetryTest(t, client.tc, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

r, err := client.tc.NewRangeReader(ctx, &newRangeReaderParams{
bucket: bucket,
object: name,
gen: defaultGen,
offset: 0,
length: -1,
}, idempotent(true))
if err != nil {
t.Fatalf("NewRangeReader: %v", err)
}
defer r.Close()

buf := &bytes.Buffer{}
if _, err := io.Copy(buf, r); err != nil {
t.Fatalf("io.Copy: %v", err)
}
if !bytes.Equal(buf.Bytes(), randomBytes3MiB) {
t.Errorf("content does not match, got len %v, want len %v", buf.Len(), len(randomBytes3MiB))
}

}, WithDynamicReadReqStallTimeout(
&DynamicReadReqStallTimeoutConfig{
TargetPercentile: 0.99,
Min: time.Second,
}))
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
func createRetryTest(t *testing.T, project, bucket string, client storageClient, instructions map[string][]string) string {
func createRetryTest(t *testing.T, client storageClient, instructions map[string][]string) string {
t.Helper()
ctx := context.Background()

_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}

// Need the HTTP hostname to set up a retry test, as well as knowledge of
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// underlying transport to specify instructions.
Expand All @@ -1470,14 +1531,20 @@ func createRetryTest(t *testing.T, project, bucket string, client storageClient,
return et.id
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
// createObject creates an object in the emulator with content randomBytesToWrite and
// returns its name, generation, and metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
return createObjectWithContent(ctx, bucket, randomBytesToWrite)
}

// createObject creates an object in the emulator with the provided []byte contents,
// and returns its name, generation, and metageneration.
func createObjectWithContent(ctx context.Context, bucket string, bytes []byte) (string, int64, int64, error) {
prefix := time.Now().Nanosecond()
objName := fmt.Sprintf("%d-object", prefix)

w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
if _, err := w.Write(randomBytesToWrite); err != nil {
if _, err := w.Write(bytes); err != nil {
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
}
if err := w.Close(); err != nil {
Expand Down
86 changes: 71 additions & 15 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
Expand All @@ -47,13 +48,14 @@ import (
// httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
// storageClient interface.
type httpStorageClient struct {
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
dynamicReadReqStallTimeout *dynamicDelay
}

// newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
Expand Down Expand Up @@ -128,14 +130,29 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl
return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
}

var dd *dynamicDelay
if config.DynamicReadReqStallTimeoutConfig != nil {
drrstConfig := config.DynamicReadReqStallTimeoutConfig
dd, err = newDynamicDelay(
drrstConfig.TargetPercentile,
getDynamicReadReqIncreaseRateFromEnv(),
getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min),
drrstConfig.Min,
defaultDynamicReqdReqMaxTimeout)
if err != nil {
return nil, fmt.Errorf("creating dynamic-delay: %w", err)
}
}

return &httpStorageClient{
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
dynamicReadReqStallTimeout: dd,
}, nil
}

Expand Down Expand Up @@ -858,7 +875,46 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa
reopen := readerReopen(ctx, req.Header, params, s,
func(ctx context.Context) (*http.Response, error) {
setHeadersFromCtx(ctx, req.Header)
return c.hc.Do(req.WithContext(ctx))

if c.dynamicReadReqStallTimeout == nil {
return c.hc.Do(req.WithContext(ctx))
}

cancelCtx, cancel := context.WithCancel(ctx)
var (
res *http.Response
err error
)

done := make(chan bool)
go func() {
reqStartTime := time.Now()
res, err = c.hc.Do(req.WithContext(cancelCtx))
if err == nil {
reqLatency := time.Since(reqStartTime)
c.dynamicReadReqStallTimeout.update(reqLatency)
} else if errors.Is(err, context.Canceled) {
// context.Canceled means operation took more than current dynamicTimeout,
// hence should be increased.
c.dynamicReadReqStallTimeout.increase()
}
done <- true
}()

// Wait until timeout or request is successful.
timer := time.After(c.dynamicReadReqStallTimeout.getValue())
select {
case <-timer:
log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds())
cancel()
err = context.DeadlineExceeded
if res != nil && res.Body != nil {
res.Body.Close()
}
case <-done:
cancel = nil
}
return res, err
},
func() error { return setConditionsHeaders(req.Header, params.conds) },
func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
Expand Down
2 changes: 0 additions & 2 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
itesting "google.golang.org/api/iterator/testing"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -93,7 +92,6 @@ var (
)

func TestMain(m *testing.M) {
grpc.EnableTracing = true
cleanup := initIntegrationTest()
cleanupEmulatorClients := initEmulatorClients()
exit := m.Run()
Expand Down
Loading
Loading