Skip to content

Commit

Permalink
feat(thanos): disable retries when congestion control is enabled (#14867
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ashwanthgoli authored Nov 12, 2024
1 parent ac2e21f commit 947a66f
Show file tree
Hide file tree
Showing 20 changed files with 125 additions and 182 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ require (
github.com/richardartoul/molecule v1.0.0
github.com/schollz/progressbar/v3 v3.17.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015013301-cea7aa5d8037
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2585,8 +2585,8 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.480/go.mod
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm v1.0.480/go.mod h1:zaBIuDDs+rC74X8Aog+LSu91GFtHYRYDC196RGTm2jk=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13 h1:PQd6xZs18KGoCZJgL9eyYsrRGzzRwYCr4iXuehZm++w=
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo
bucketConfig.ContainerName = cfg.ContainerName
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.UserAssignedID = cfg.UserAssignedID
bucketConfig.HTTPConfig.Transport = cfg.Transport

if cfg.Endpoint != "" {
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
bucketConfig.Endpoint = cfg.Endpoint
}

return factory(logger, bucketConfig, name, func(rt http.RoundTripper) http.RoundTripper {
if cfg.Transport != nil {
rt = cfg.Transport
}
return rt
})
return factory(logger, bucketConfig, name, nil)
}
40 changes: 40 additions & 0 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"regexp"

"github.com/go-kit/log"
Expand Down Expand Up @@ -126,6 +128,44 @@ func (cfg *Config) Validate() error {
return cfg.StorageBackendConfig.Validate()
}

func (cfg *Config) disableRetries(backend string) error {
switch backend {
case S3:
cfg.S3.MaxRetries = 1
case GCS:
cfg.GCS.MaxRetries = 1
case Azure:
cfg.Azure.MaxRetries = 1
case Swift:
cfg.Swift.MaxRetries = 1
case Filesystem:
// do nothing
default:
return fmt.Errorf("cannot disable retries for backend: %s", backend)
}

return nil
}

func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) error {
switch backend {
case S3:
cfg.S3.HTTP.Transport = rt
case GCS:
cfg.GCS.Transport = rt
case Azure:
cfg.Azure.Transport = rt
case Swift:
cfg.Swift.Transport = rt
case Filesystem:
// do nothing
default:
return fmt.Errorf("cannot configure transport for backend: %s", backend)
}

return nil
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) {
var (
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/bucket/gcs/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
bucketConfig.Bucket = cfg.BucketName
bucketConfig.ServiceAccount = cfg.ServiceAccount.String()
bucketConfig.ChunkSizeBytes = cfg.ChunkBufferSize
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.HTTPConfig.Transport = cfg.Transport

return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/bucket/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
BucketName string `yaml:"bucket_name"`
ServiceAccount flagext.Secret `yaml:"service_account" doc:"description_method=GCSServiceAccountLongDescription"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
MaxRetries int `yaml:"max_retries"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
Expand All @@ -27,6 +28,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.BucketName, prefix+"gcs.bucket-name", "", "GCS bucket name")
f.Var(&cfg.ServiceAccount, prefix+"gcs.service-account", cfg.GCSServiceAccountShortDescription())
f.IntVar(&cfg.ChunkBufferSize, prefix+"gcs.chunk-buffer-size", 0, "The maximum size of the buffer that GCS client for a single PUT request. 0 to disable buffering.")
f.IntVar(&cfg.MaxRetries, prefix+"gcs.max-retries", 10, "The maximum number of retries for idempotent operations. Overrides the default gcs storage client behavior if this value is greater than 0. Set this to 1 to disable retries.")
}

func (cfg *Config) GCSServiceAccountShortDescription() string {
Expand Down
52 changes: 38 additions & 14 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package bucket

import (
"context"
"fmt"
"io"
"slices"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

type ObjectClientAdapter struct {
Expand All @@ -21,9 +26,33 @@ type ObjectClientAdapter struct {
isRetryableErr func(err error) bool
}

func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter {
if hedgedBucket == nil {
hedgedBucket = bucket
func NewObjectClient(ctx context.Context, backend string, cfg Config, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) {
if disableRetries {
if err := cfg.disableRetries(backend); err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}
}

bucket, err := NewClient(ctx, backend, cfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}

hedgedBucket := bucket
if hedgingCfg.At != 0 {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, fmt.Errorf("create hedged transport: %w", err)
}

if err := cfg.configureTransport(backend, hedgedTrasport); err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}

hedgedBucket, err = NewClient(ctx, backend, cfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}
}

o := &ObjectClientAdapter{
Expand All @@ -37,19 +66,14 @@ func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Log
},
}

for _, opt := range opts {
opt(o)
switch backend {
case GCS:
o.isRetryableErr = gcp.IsRetryableErr
case S3:
o.isRetryableErr = aws.IsRetryableErr
}

return o
}

type ClientOptions func(*ObjectClientAdapter)

func WithRetryableErrFunc(f func(err error) bool) ClientOptions {
return func(o *ObjectClientAdapter) {
o.isRetryableErr = f
}
return o, nil
}

func (o *ObjectClientAdapter) Stop() {
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/bucket/object_client_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"sort"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

func TestObjectClientAdapter_List(t *testing.T) {
Expand Down Expand Up @@ -95,8 +97,12 @@ func TestObjectClientAdapter_List(t *testing.T) {
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

client := NewObjectClientAdapter(newBucket, nil, nil)
client.bucket = newBucket
client, err := NewObjectClient(context.Background(), "filesystem", Config{
StorageBackendConfig: StorageBackendConfig{
Filesystem: config,
},
}, "test", hedging.Config{}, false, log.NewNopLogger())
require.NoError(t, err)

storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ func newS3Config(cfg Config) (s3.Config, error) {
Enable: cfg.TraceConfig.Enabled,
},
STSEndpoint: cfg.STSEndpoint,
MaxRetries: cfg.MaxRetries,
}, nil
}
2 changes: 2 additions & 0 deletions pkg/storage/bucket/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Config struct {
PartSize uint64 `yaml:"part_size" category:"experimental"`
SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"`
STSEndpoint string `yaml:"sts_endpoint"`
MaxRetries int `yaml:"max_retries"`

SSE SSEConfig `yaml:"sse"`
HTTP HTTPConfig `yaml:"http"`
Expand Down Expand Up @@ -146,6 +147,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", ")))
f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.")
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.")
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)
Expand Down
13 changes: 4 additions & 9 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/exthttp"
"github.com/thanos-io/objstore/providers/swift"
yaml "gopkg.in/yaml.v2"
)

// NewBucketClient creates a new Swift bucket client
Expand Down Expand Up @@ -33,14 +33,9 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket,
// Hard-coded defaults.
ChunkSize: swift.DefaultConfig.ChunkSize,
UseDynamicLargeObjects: false,
HTTPConfig: exthttp.DefaultHTTPConfig,
}
bucketConfig.HTTPConfig.Transport = cfg.Transport

// Thanos currently doesn't support passing the config as is, but expects a YAML,
// so we're going to serialize it.
serialized, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
}

return swift.NewContainer(logger, serialized, nil)
return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
}
4 changes: 4 additions & 0 deletions pkg/storage/bucket/swift/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swift

import (
"flag"
"net/http"
"time"
)

Expand All @@ -26,6 +27,9 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlags registers the flags for Swift storage
Expand Down
44 changes: 0 additions & 44 deletions pkg/storage/chunk/client/aws/s3_thanos_object_client.go

This file was deleted.

This file was deleted.

Loading

0 comments on commit 947a66f

Please sign in to comment.