Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): dynamic read request stall timeout #10958

Merged
merged 12 commits into from
Oct 14, 2024
65 changes: 61 additions & 4 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package storage

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
Expand Down Expand Up @@ -948,7 +950,6 @@ func initEmulatorClients() func() error {
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
return noopCloser
}

emulatorClients = map[string]storageClient{
"http": httpClient,
"grpc": grpcClient,
Expand Down Expand Up @@ -1436,6 +1437,52 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
})
}

// Test validates the retry for stalled read-request, when client is created with
// WithDynamicReadReqStallTimeout.
func TestRetryReadReqStallEmulated(t *testing.T) {
multiTransportTest(skipJSONReads(skipGRPC("not supported"), "not supported"), t, func(t *testing.T, ctx context.Context, project, _ string, client *Client) {
// Setup bucket and upload object.
bucket := fmt.Sprintf("http-bucket-%d", time.Now().Nanosecond())
if _, err := client.tc.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}, nil); err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

name, _, _, err := createObjectWithContent(ctx, bucket, randomBytes3MiB)
if err != nil {
t.Fatalf("createObject: %v", err)
}

// Plant stall at start for 2s.
instructions := map[string][]string{"storage.objects.get": {"stall-for-2s-after-0K"}}
testID := plantRetryInstructions(t, client.tc, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

r, err := client.tc.NewRangeReader(ctx, &newRangeReaderParams{
bucket: bucket,
object: name,
gen: defaultGen,
offset: 0,
length: -1,
}, idempotent(true))
if err != nil {
t.Fatalf("NewRangeReader: %v", err)
}
defer r.Close()

buf := &bytes.Buffer{}
if _, err := io.Copy(buf, r); err != nil {
t.Fatalf("io.Copy: %v", err)
}
if !bytes.Equal(buf.Bytes(), randomBytes3MiB) {
t.Errorf("content does not match, got len %v, want len %v", buf.Len(), len(randomBytes3MiB))
}

}, WithDynamicReadReqStallTimeout(0.99, 15, time.Second, time.Second, 2*time.Second))
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
Expand All @@ -1448,6 +1495,10 @@ func createRetryTest(t *testing.T, project, bucket string, client storageClient,
t.Fatalf("creating bucket: %v", err)
}

return plantRetryInstructions(t, client, instructions)
}

func plantRetryInstructions(t *testing.T, client storageClient, instructions map[string][]string) string {
// Need the HTTP hostname to set up a retry test, as well as knowledge of
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// underlying transport to specify instructions.
host := os.Getenv("STORAGE_EMULATOR_HOST")
Expand All @@ -1470,14 +1521,20 @@ func createRetryTest(t *testing.T, project, bucket string, client storageClient,
return et.id
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
// createObject creates an object in the emulator with content randomBytesToWrite and
// returns its name, generation, and metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
return createObjectWithContent(ctx, bucket, randomBytesToWrite)
}

// createObject creates an object in the emulator with the provided []byte contents,
// and returns its name, generation, and metageneration.
func createObjectWithContent(ctx context.Context, bucket string, bytes []byte) (string, int64, int64, error) {
prefix := time.Now().Nanosecond()
objName := fmt.Sprintf("%d-object", prefix)

w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
if _, err := w.Write(randomBytesToWrite); err != nil {
if _, err := w.Write(bytes); err != nil {
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
}
if err := w.Close(); err != nil {
Expand Down
86 changes: 71 additions & 15 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
Expand All @@ -47,13 +48,14 @@ import (
// httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
// storageClient interface.
type httpStorageClient struct {
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
dynamicReadReqStallTimeout *dynamicDelay
}

// newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
Expand Down Expand Up @@ -128,14 +130,29 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl
return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
}

var dd *dynamicDelay
if config.dynamicReadReqStallTimeout != nil {
drrstConfig := config.dynamicReadReqStallTimeout
dd, err = newDynamicDelay(
drrstConfig.targetPercentile,
drrstConfig.increaseRate,
drrstConfig.initial,
drrstConfig.min,
drrstConfig.max)
if err != nil {
return nil, fmt.Errorf("creating dynamic-delay: %w", err)
}
}

return &httpStorageClient{
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
dynamicReadReqStallTimeout: dd,
}, nil
}

Expand Down Expand Up @@ -858,7 +875,46 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa
reopen := readerReopen(ctx, req.Header, params, s,
func(ctx context.Context) (*http.Response, error) {
setHeadersFromCtx(ctx, req.Header)
return c.hc.Do(req.WithContext(ctx))

if c.dynamicReadReqStallTimeout == nil {
return c.hc.Do(req.WithContext(ctx))
}

cancelCtx, cancel := context.WithCancel(ctx)
var (
res *http.Response
err error
)

done := make(chan bool)
go func() {
reqStartTime := time.Now()
res, err = c.hc.Do(req.WithContext(cancelCtx))
if err == nil {
reqLatency := time.Since(reqStartTime)
c.dynamicReadReqStallTimeout.update(reqLatency)
} else if errors.Is(err, context.Canceled) {
// context.Canceled means operation took more than current dynamicTimeout,
// hence should be increased.
c.dynamicReadReqStallTimeout.increase()
}
done <- true
}()

// Wait until timeout or request is successful.
timer := time.After(c.dynamicReadReqStallTimeout.getValue())
select {
case <-timer:
log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds())
cancel()
err = context.DeadlineExceeded
if res != nil && res.Body != nil {
res.Body.Close()
}
case <-done:
cancel = nil
}
return res, err
},
func() error { return setConditionsHeaders(req.Header, params.conds) },
func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
Expand Down
2 changes: 0 additions & 2 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
itesting "google.golang.org/api/iterator/testing"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -93,7 +92,6 @@ var (
)

func TestMain(m *testing.M) {
grpc.EnableTracing = true
cleanup := initIntegrationTest()
cleanupEmulatorClients := initEmulatorClients()
exit := m.Run()
Expand Down
65 changes: 62 additions & 3 deletions storage/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package storage

import (
"time"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
)

// storageConfig contains the Storage client option configuration that can be
// set through storageClientOptions.
type storageConfig struct {
useJSONforReads bool
readAPIWasSet bool
disableClientMetrics bool
useJSONforReads bool
readAPIWasSet bool
disableClientMetrics bool
dynamicReadReqStallTimeout *dynamicReadReqStallTimeout
}

// newStorageConfig generates a new storageConfig with all the given
Expand Down Expand Up @@ -108,3 +111,59 @@ func WithDisabledClientMetrics() option.ClientOption {
func (w *withDisabledClientMetrics) ApplyStorageOpt(c *storageConfig) {
c.disableClientMetrics = w.disabledClientMetrics
}

// WithDynamicReadReqStallTimeout is an option that may be passed to [NewClient].
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// It enables the client to retry the stalled read request, happens as part of
// storage.Reader creation. As the name suggest, timeout is adjusted dynamically
// based on past observed read-req latencies.
//
// This is only supported for the read operation and that too for http(XML) client.
// Grpc read-operation will be supported soon.
//
// Here, the input parameter decides the value of dynamic-timeout.
// targetPercentile is the desired percentile of the observed latencies.
// increaseRate determines the rate, timeout is adjusted. High means slow increase in
// timeout and low means rapid increase.
// initialTimeout decides the initial timeout.
// minTimeout, maxTimeout is the lower & upper bound of the timeout.
//
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// Currently, dynamicTimeout is kept at the client and hence shared across all the
// BucketHandle, which is not not the ideal state. As latency depends on location of VM
// and Bucket, and read latency of different buckets may lie in different range.
// Hence hence having a separate dynamicTimeout instance at BucketHandle level will
// be better
func WithDynamicReadReqStallTimeout(
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
targetPercentile float64,
increaseRate float64,
initialTimeout time.Duration,
minTimeout time.Duration,
maxTimeout time.Duration) option.ClientOption {
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
return &withDynamicReadReqStallTimeout{
dynamicReadReqStallTimeout: &dynamicReadReqStallTimeout{
targetPercentile: targetPercentile,
increaseRate: increaseRate,
initial: initialTimeout,
min: minTimeout,
max: maxTimeout,
},
}
}

type dynamicReadReqStallTimeout struct {
min time.Duration
max time.Duration
initial time.Duration

targetPercentile float64
increaseRate float64
}

type withDynamicReadReqStallTimeout struct {
internaloption.EmbeddableAdapter
dynamicReadReqStallTimeout *dynamicReadReqStallTimeout
}

func (wdrrst *withDynamicReadReqStallTimeout) ApplyStorageOpt(config *storageConfig) {
config.dynamicReadReqStallTimeout = wdrrst.dynamicReadReqStallTimeout
}
21 changes: 19 additions & 2 deletions storage/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package storage

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
Expand Down Expand Up @@ -81,6 +82,22 @@ func TestApplyStorageOpt(t *testing.T) {
disableClientMetrics: true,
},
},
{
desc: "set dynamic read req stall timeout option",
opts: []option.ClientOption{WithDynamicReadReqStallTimeout(0.99, 15, time.Second, time.Second, 2*time.Second)},
want: storageConfig{
useJSONforReads: false,
readAPIWasSet: false,
disableClientMetrics: false,
dynamicReadReqStallTimeout: &dynamicReadReqStallTimeout{
targetPercentile: 0.99,
increaseRate: 15,
initial: time.Second,
min: time.Second,
max: 2 * time.Second,
},
},
},
} {
t.Run(test.desc, func(t *testing.T) {
var got storageConfig
Expand All @@ -89,8 +106,8 @@ func TestApplyStorageOpt(t *testing.T) {
storageOpt.ApplyStorageOpt(&got)
}
}
if !cmp.Equal(got, test.want, cmp.AllowUnexported(storageConfig{})) {
t.Errorf(cmp.Diff(got, test.want, cmp.AllowUnexported(storageConfig{})))
if !cmp.Equal(got, test.want, cmp.AllowUnexported(storageConfig{}, dynamicReadReqStallTimeout{})) {
t.Errorf(cmp.Diff(got, test.want, cmp.AllowUnexported(storageConfig{}, dynamicReadReqStallTimeout{})))
}
})
}
Expand Down
Loading