Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[querier] s3: add getObject retry #4453

Merged
merged 13 commits into from
Oct 25, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler
* [4519](https://github.com/grafana/loki/pull/4519) **DylanGuedes** and **replay**: Loki: Enable FIFO cache by default
* [4520](https://github.com/grafana/loki/pull/4520) **jordanrushing** and **owen-d**: Introduce overrides-exporter module for tenant limits
* [4453](https://github.com/grafana/loki/pull/4453) **liguozhong**: Loki: Implement retry to s3 chunk storage

# 2.3.0 (2021/08/06)

Expand Down
14 changes: 14 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,20 @@ aws:
# CLI flag: -s3.http.ca-file
[ca_file: <string> | default = ""]

# Configures back off when s3 get Object.
backoff_config:
# Minimum duration to back off.
# CLI flag: -s3.backoff-min-period
[min_period: <duration> | default = 100ms]

# The duration to back off.
# CLI flag: -s3.backoff-max-period
[max_period: <duration> | default = 3s]

# Number of times to back off and retry before failing.
# CLI flag: -s3.backoff-retries
[max_retries: <int> | default = 5]

# Configure the DynamoDB connection
dynamodb:
# URL for DynamoDB with escaped Key and Secret encoded. If only region is specified as a
Expand Down
36 changes: 25 additions & 11 deletions pkg/storage/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws"
cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -76,6 +77,7 @@ type S3Config struct {
HTTPConfig HTTPConfig `yaml:"http_config"`
SignatureVersion string `yaml:"signature_version"`
SSEConfig cortex_s3.SSEConfig `yaml:"sse"`
BackoffConfig backoff.Config `yaml:"backoff_config"`

Inject InjectRequestMiddleware `yaml:"-"`
}
Expand Down Expand Up @@ -116,6 +118,9 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.HTTPConfig.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "Set to true to skip verifying the certificate chain and hostname.")
f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"s3.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the S3 endpoint.")
f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", ")))
f.DurationVar(&cfg.BackoffConfig.MinBackoff, "s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object")
f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object")
f.IntVar(&cfg.BackoffConfig.MaxRetries, "s3.max-retries", 5, "Maximum number of times to retry when s3 get Object")
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -153,6 +158,7 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig {
}

type S3ObjectClient struct {
cfg S3Config
bucketNames []string
S3 s3iface.S3API
sseConfig *SSEParsedConfig
Expand Down Expand Up @@ -182,6 +188,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
}

client := S3ObjectClient{
cfg: cfg,
S3: s3Client,
bucketNames: bucketNames,
sseConfig: sseCfg,
Expand Down Expand Up @@ -360,19 +367,26 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
retries := backoff.New(ctx, a.cfg.BackoffConfig)
err := ctx.Err()
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject")
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Comment on lines +371 to +376
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err := ctx.Err()
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject")
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
for retries.Ongoing() {
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "ctx related error during s3 getObject")
}
err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {

var requestErr error
resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
return requestErr
})
return err
})
if err != nil {
return nil, err
if err == nil {
return resp.Body, nil
}
retries.Wait()
}

return resp.Body, nil
return nil, errors.Wrap(err, "failed to get s3 object")
}

// PutObject into the store
Expand Down