diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 0bd8ec4e92e53..06f8d128f850d 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -84,10 +84,9 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag. } func (cfg *StorageBackendConfig) Validate() error { - // TODO: enable validation when s3 flags are registered - // if err := cfg.S3.Validate(); err != nil { - // return err - //} + if err := cfg.S3.Validate(); err != nil { + return err + } return nil } diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 107e18b3c7bb0..5d904d8e5fe9b 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -4,6 +4,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/exthttp" "github.com/thanos-io/objstore/providers/s3" ) @@ -38,17 +39,28 @@ func newS3Config(cfg Config) (s3.Config, error) { return s3.Config{}, err } + putUserMetadata := map[string]string{} + + if cfg.StorageClass != "" { + putUserMetadata[awsStorageClassHeader] = cfg.StorageClass + } + return s3.Config{ - Bucket: cfg.BucketName, - Endpoint: cfg.Endpoint, - Region: cfg.Region, - AccessKey: cfg.AccessKeyID, - SecretKey: cfg.SecretAccessKey.String(), - SessionToken: cfg.SessionToken.String(), - Insecure: cfg.Insecure, - DisableDualstack: cfg.DisableDualstack, - SSEConfig: sseCfg, - PutUserMetadata: map[string]string{awsStorageClassHeader: cfg.StorageClass}, + Bucket: cfg.BucketName, + Endpoint: cfg.Endpoint, + Region: cfg.Region, + AccessKey: cfg.AccessKeyID, + SecretKey: cfg.SecretAccessKey.String(), + SessionToken: cfg.SessionToken.String(), + Insecure: cfg.Insecure, + PutUserMetadata: putUserMetadata, + SendContentMd5: cfg.SendContentMd5, + SSEConfig: sseCfg, + DisableDualstack: !cfg.DualstackEnabled, + ListObjectsVersion: cfg.ListObjectsVersion, + BucketLookupType: cfg.BucketLookupType, + AWSSDKAuth: cfg.NativeAWSAuthEnabled, + PartSize: cfg.PartSize, HTTPConfig: s3.HTTPConfig{ IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout), ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout), @@ -59,6 +71,16 @@ func newS3Config(cfg Config) (s3.Config, error) { MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost, MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost, Transport: cfg.HTTP.Transport, + TLSConfig: exthttp.TLSConfig{ + CAFile: cfg.HTTP.TLSConfig.CAPath, + CertFile: cfg.HTTP.TLSConfig.CertPath, + KeyFile: cfg.HTTP.TLSConfig.KeyPath, + ServerName: cfg.HTTP.TLSConfig.ServerName, + }, + }, + TraceConfig: s3.TraceConfig{ + Enable: cfg.TraceConfig.Enabled, }, + STSEndpoint: cfg.STSEndpoint, }, nil } diff --git a/pkg/storage/bucket/s3/config.go b/pkg/storage/bucket/s3/config.go index 32db169f450f6..792f93f752b32 100644 --- a/pkg/storage/bucket/s3/config.go +++ b/pkg/storage/bucket/s3/config.go @@ -5,23 +5,20 @@ import ( "flag" "fmt" "net/http" + "slices" "strings" + "time" + s3_service "github.com/aws/aws-sdk-go/service/s3" "github.com/grafana/dskit/flagext" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/pkg/errors" "github.com/thanos-io/objstore/providers/s3" - bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http" - "github.com/grafana/loki/v3/pkg/storage/common/aws" "github.com/grafana/loki/v3/pkg/util" ) const ( - // Signature Version 2 is being turned off (deprecated) in Amazon S3. Amazon S3 will then only accept API requests that are signed using Signature Version 4. - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingAWSSDK.html#UsingAWSSDK-sig2-deprecation - SignatureVersionV4 = "v4" - // SSEKMS config type constant to configure S3 server side encryption using KMS // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html SSEKMS = "SSE-KMS" @@ -32,41 +29,99 @@ const ( ) var ( - supportedSignatureVersions = []string{SignatureVersionV4} - supportedSSETypes = []string{SSEKMS, SSES3} - errUnsupportedSignatureVersion = errors.New("unsupported signature version") - errUnsupportedSSEType = errors.New("unsupported S3 SSE type") - errInvalidSSEContext = errors.New("invalid S3 SSE encryption context") + supportedSSETypes = []string{SSEKMS, SSES3} + supportedStorageClasses = s3_service.ObjectStorageClass_Values() + supportedBucketLookupTypes = thanosS3BucketLookupTypesValues() + + errUnsupportedSSEType = errors.New("unsupported S3 SSE type") + errUnsupportedStorageClass = fmt.Errorf("unsupported S3 storage class (supported values: %s)", strings.Join(supportedStorageClasses, ", ")) + errInvalidSSEContext = errors.New("invalid S3 SSE encryption context") + errInvalidEndpointPrefix = errors.New("the endpoint must not prefixed with the bucket name") + errInvalidSTSEndpoint = errors.New("sts-endpoint must be a valid url") ) +var thanosS3BucketLookupTypes = map[string]s3.BucketLookupType{ + s3.AutoLookup.String(): s3.AutoLookup, + s3.VirtualHostLookup.String(): s3.VirtualHostLookup, + s3.PathLookup.String(): s3.PathLookup, +} + +func thanosS3BucketLookupTypesValues() (list []string) { + for k := range thanosS3BucketLookupTypes { + list = append(list, k) + } + // sort the list for consistent output in help, where it's used + slices.Sort(list) + return list +} + // HTTPConfig stores the http.Transport configuration for the s3 minio client. type HTTPConfig struct { - bucket_http.Config `yaml:",inline"` + IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"` + ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"` + InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"` + TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"` + ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"` + MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"` + MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"` + MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"` // Allow upstream callers to inject a round tripper Transport http.RoundTripper `yaml:"-"` + + TLSConfig TLSConfig `yaml:",inline"` +} + +// TLSConfig configures the options for TLS connections. +type TLSConfig struct { + CAPath string `yaml:"tls_ca_path" category:"advanced"` + CertPath string `yaml:"tls_cert_path" category:"advanced"` + KeyPath string `yaml:"tls_key_path" category:"advanced"` + ServerName string `yaml:"tls_server_name" category:"advanced"` } // RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.Config.RegisterFlagsWithPrefix(prefix+"s3.", f) + f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.") + f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.") + f.BoolVar(&cfg.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "If the client connects to S3 via HTTPS and this option is enabled, the client will accept any certificate and hostname.") + f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"s3.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.") + f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"s3.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.") + f.IntVar(&cfg.MaxIdleConns, prefix+"s3.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.") + f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"s3.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.") + f.IntVar(&cfg.MaxConnsPerHost, prefix+"s3.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.") + cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f) +} + +// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix. +func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.CAPath, prefix+"s3.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.") + f.StringVar(&cfg.CertPath, prefix+"s3.http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.") + f.StringVar(&cfg.KeyPath, prefix+"s3.http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.") + f.StringVar(&cfg.ServerName, prefix+"s3.http.tls-server-name", "", "Override the expected name on the server certificate.") } // Config holds the config options for an S3 backend type Config struct { - Endpoint string `yaml:"endpoint"` - Region string `yaml:"region"` - BucketName string `yaml:"bucket_name"` - SecretAccessKey flagext.Secret `yaml:"secret_access_key"` - SessionToken flagext.Secret `yaml:"session_token"` - AccessKeyID string `yaml:"access_key_id"` - Insecure bool `yaml:"insecure"` - DisableDualstack bool `yaml:"disable_dualstack"` - SignatureVersion string `yaml:"signature_version"` - StorageClass string `yaml:"storage_class"` + Endpoint string `yaml:"endpoint"` + Region string `yaml:"region"` + BucketName string `yaml:"bucket_name"` + SecretAccessKey flagext.Secret `yaml:"secret_access_key"` + AccessKeyID string `yaml:"access_key_id"` + SessionToken flagext.Secret `yaml:"session_token"` + Insecure bool `yaml:"insecure" category:"advanced"` + ListObjectsVersion string `yaml:"list_objects_version" category:"advanced"` + BucketLookupType s3.BucketLookupType `yaml:"bucket_lookup_type" category:"advanced"` + DualstackEnabled bool `yaml:"dualstack_enabled" category:"experimental"` + StorageClass string `yaml:"storage_class" category:"experimental"` + NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled" category:"experimental"` + PartSize uint64 `yaml:"part_size" category:"experimental"` + SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"` + STSEndpoint string `yaml:"sts_endpoint"` - SSE SSEConfig `yaml:"sse"` - HTTP HTTPConfig `yaml:"http"` + SSE SSEConfig `yaml:"sse"` + HTTP HTTPConfig `yaml:"http"` + TraceConfig TraceConfig `yaml:"trace"` } // RegisterFlags registers the flags for s3 storage with the provided prefix @@ -83,21 +138,32 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.Region, prefix+"s3.region", "", "S3 region. If unset, the client will issue a S3 GetBucketLocation API call to autodetect it.") f.StringVar(&cfg.Endpoint, prefix+"s3.endpoint", "", "The S3 bucket endpoint. It could be an AWS S3 endpoint listed at https://docs.aws.amazon.com/general/latest/gr/s3.html or the address of an S3-compatible service in hostname:port format.") f.BoolVar(&cfg.Insecure, prefix+"s3.insecure", false, "If enabled, use http:// for the S3 endpoint instead of https://. This could be useful in local dev/test environments while using an S3-compatible backend storage, like Minio.") - f.BoolVar(&cfg.DisableDualstack, prefix+"s3.disable-dualstack", false, "Disable forcing S3 dualstack endpoint usage.") - f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", "))) - f.StringVar(&cfg.StorageClass, prefix+"s3.storage-class", aws.StorageClassStandard, "The S3 storage class to use. Details can be found at https://aws.amazon.com/s3/storage-classes/.") + f.StringVar(&cfg.ListObjectsVersion, prefix+"s3.list-objects-version", "", "Use a specific version of the S3 list object API. Supported values are v1 or v2. Default is unset.") + f.StringVar(&cfg.StorageClass, prefix+"s3.storage-class", "", "The S3 storage class to use, not set by default. Details can be found at https://aws.amazon.com/s3/storage-classes/. Supported values are: "+strings.Join(supportedStorageClasses, ", ")) + f.BoolVar(&cfg.NativeAWSAuthEnabled, prefix+"s3.native-aws-auth-enabled", false, "If enabled, it will use the default authentication methods of the AWS SDK for go based on known environment variables and known AWS config files.") + f.Uint64Var(&cfg.PartSize, prefix+"s3.part-size", 0, "The minimum file size in bytes used for multipart uploads. If 0, the value is optimally computed for each object.") + f.BoolVar(&cfg.SendContentMd5, prefix+"s3.send-content-md5", false, "If enabled, a Content-MD5 header is sent with S3 Put Object requests. Consumes more resources to compute the MD5, but may improve compatibility with object storage services that do not support checksums.") + f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", "))) + f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.") + f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.") cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f) cfg.HTTP.RegisterFlagsWithPrefix(prefix, f) + cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f) } // Validate config and returns error on failure func (cfg *Config) Validate() error { - if !util.StringsContain(supportedSignatureVersions, cfg.SignatureVersion) { - return errUnsupportedSignatureVersion + if cfg.Endpoint != "" { + endpoint := strings.Split(cfg.Endpoint, ".") + if cfg.BucketName != "" && endpoint[0] != "" && endpoint[0] == cfg.BucketName { + return errInvalidEndpointPrefix + } } - - if err := aws.ValidateStorageClass(cfg.StorageClass); err != nil { - return err + if cfg.STSEndpoint != "" && !util.IsValidURL(cfg.STSEndpoint) { + return errInvalidSTSEndpoint + } + if !slices.Contains(supportedStorageClasses, cfg.StorageClass) && cfg.StorageClass != "" { + return errUnsupportedStorageClass } return cfg.SSE.Validate() @@ -191,3 +257,35 @@ func parseKMSEncryptionContext(data string) (map[string]string, error) { err := errors.Wrap(json.Unmarshal([]byte(data), &decoded), "unable to parse KMS encryption context") return decoded, err } + +type TraceConfig struct { + Enabled bool `yaml:"enabled" category:"advanced"` +} + +func (cfg *TraceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "When enabled, low-level S3 HTTP operation information is logged at the debug level.") +} + +// bucketLookupTypeValue is an adapter between s3.BucketLookupType and flag.Value. +type bucketLookupTypeValue s3.BucketLookupType + +func newBucketLookupTypeValue(value s3.BucketLookupType, p *s3.BucketLookupType) *bucketLookupTypeValue { + *p = value + return (*bucketLookupTypeValue)(p) +} + +func (v *bucketLookupTypeValue) String() string { + if v == nil { + return s3.AutoLookup.String() + } + return s3.BucketLookupType(*v).String() +} + +func (v *bucketLookupTypeValue) Set(s string) error { + t, ok := thanosS3BucketLookupTypes[s] + if !ok { + return fmt.Errorf("unsupported bucket lookup type: %s", s) + } + *v = bucketLookupTypeValue(t) + return nil +} diff --git a/pkg/storage/bucket/s3/config_test.go b/pkg/storage/bucket/s3/config_test.go index 3f32e8f847936..078353b68bd86 100644 --- a/pkg/storage/bucket/s3/config_test.go +++ b/pkg/storage/bucket/s3/config_test.go @@ -1,127 +1,23 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/storage/bucket/s3/config_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. + package s3 import ( + "bytes" "encoding/base64" - "fmt" "net/http" - "strings" "testing" - "time" + s3_service "github.com/aws/aws-sdk-go/service/s3" "github.com/grafana/dskit/flagext" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" - - bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http" - "github.com/grafana/loki/v3/pkg/storage/common/aws" + "gopkg.in/yaml.v3" ) -// defaultConfig should match the default flag values defined in RegisterFlagsWithPrefix. -var defaultConfig = Config{ - SignatureVersion: SignatureVersionV4, - StorageClass: aws.StorageClassStandard, - HTTP: HTTPConfig{ - Config: bucket_http.Config{ - IdleConnTimeout: 90 * time.Second, - ResponseHeaderTimeout: 2 * time.Minute, - InsecureSkipVerify: false, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, - MaxConnsPerHost: 0, - }, - }, -} - -func TestConfig(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - config string - expectedConfig Config - expectedErr error - }{ - "default config": { - config: "", - expectedConfig: defaultConfig, - expectedErr: nil, - }, - "custom config": { - config: ` -endpoint: test-endpoint -region: test-region -bucket_name: test-bucket-name -secret_access_key: test-secret-access-key -access_key_id: test-access-key-id -insecure: true -signature_version: test-signature-version -storage_class: test-storage-class -disable_dualstack: true -sse: - type: test-type - kms_key_id: test-kms-key-id - kms_encryption_context: test-kms-encryption-context -http: - idle_conn_timeout: 2s - response_header_timeout: 3s - insecure_skip_verify: true - tls_handshake_timeout: 4s - expect_continue_timeout: 5s - max_idle_connections: 6 - max_idle_connections_per_host: 7 - max_connections_per_host: 8 -`, - expectedConfig: Config{ - Endpoint: "test-endpoint", - Region: "test-region", - BucketName: "test-bucket-name", - SecretAccessKey: flagext.SecretWithValue("test-secret-access-key"), - AccessKeyID: "test-access-key-id", - Insecure: true, - SignatureVersion: "test-signature-version", - StorageClass: "test-storage-class", - DisableDualstack: true, - SSE: SSEConfig{ - Type: "test-type", - KMSKeyID: "test-kms-key-id", - KMSEncryptionContext: "test-kms-encryption-context", - }, - HTTP: HTTPConfig{ - Config: bucket_http.Config{ - IdleConnTimeout: 2 * time.Second, - ResponseHeaderTimeout: 3 * time.Second, - InsecureSkipVerify: true, - TLSHandshakeTimeout: 4 * time.Second, - ExpectContinueTimeout: 5 * time.Second, - MaxIdleConns: 6, - MaxIdleConnsPerHost: 7, - MaxConnsPerHost: 8, - }, - }, - }, - expectedErr: nil, - }, - "invalid type": { - config: `insecure: foo`, - expectedConfig: defaultConfig, - expectedErr: &yaml.TypeError{Errors: []string{"line 1: cannot unmarshal !!str `foo` into bool"}}, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - cfg := Config{} - flagext.DefaultValues(&cfg) - - err := yaml.Unmarshal([]byte(testData.config), &cfg) - require.Equal(t, testData.expectedErr, err) - require.Equal(t, testData.expectedConfig, cfg) - }) - } -} - func TestSSEConfig_Validate(t *testing.T) { tests := map[string]struct { setup func() *SSEConfig @@ -169,6 +65,85 @@ func TestSSEConfig_Validate(t *testing.T) { } } +func TestConfig_Validate(t *testing.T) { + tests := map[string]struct { + setup func() *Config + expected error + }{ + "should pass with default config": { + setup: func() *Config { + sseCfg := &SSEConfig{} + flagext.DefaultValues(sseCfg) + cfg := &Config{ + Endpoint: "s3.eu-central-1.amazonaws.com", + BucketName: "mimir-block", + SSE: *sseCfg, + StorageClass: s3_service.StorageClassStandard, + } + return cfg + }, + }, + "should fail if invalid storage class is set": { + setup: func() *Config { + return &Config{ + StorageClass: "foo", + } + }, + expected: errUnsupportedStorageClass, + }, + "should fail on invalid endpoint prefix": { + setup: func() *Config { + return &Config{ + Endpoint: "mimir-blocks.s3.eu-central-1.amazonaws.com", + BucketName: "mimir-blocks", + StorageClass: s3_service.StorageClassStandard, + } + }, + expected: errInvalidEndpointPrefix, + }, + "should pass if native_aws_auth_enabled is set": { + setup: func() *Config { + return &Config{ + NativeAWSAuthEnabled: true, + } + }, + }, + "should pass with using sts endpoint": { + setup: func() *Config { + sseCfg := &SSEConfig{} + flagext.DefaultValues(sseCfg) + cfg := &Config{ + BucketName: "mimir-block", + SSE: *sseCfg, + StorageClass: s3_service.StorageClassStandard, + STSEndpoint: "https://sts.eu-central-1.amazonaws.com", + } + return cfg + }, + }, + "should not pass with using sts endpoint as its using an invalid url": { + setup: func() *Config { + sseCfg := &SSEConfig{} + flagext.DefaultValues(sseCfg) + cfg := &Config{ + BucketName: "mimir-block", + SSE: *sseCfg, + StorageClass: s3_service.StorageClassStandard, + STSEndpoint: "sts.eu-central-1.amazonaws.com", + } + return cfg + }, + expected: errInvalidSTSEndpoint, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, testData.setup().Validate()) + }) + } +} + func TestSSEConfig_BuildMinioConfig(t *testing.T) { tests := map[string]struct { cfg *SSEConfig @@ -225,31 +200,32 @@ func TestParseKMSEncryptionContext(t *testing.T) { assert.Equal(t, expected, actual) } -func TestConfig_Validate(t *testing.T) { - tests := map[string]struct { - cfg Config - expectedErr error - }{ - "should fail if invalid signature version is set": { - Config{SignatureVersion: "foo"}, - errUnsupportedSignatureVersion, - }, - "should pass if valid signature version is set": { - defaultConfig, - nil, - }, - "should fail if invalid storage class is set": { - Config{SignatureVersion: SignatureVersionV4, StorageClass: "foo"}, - fmt.Errorf("unsupported S3 storage class: foo. Supported values: %s", strings.Join(aws.SupportedStorageClasses, ", ")), - }, - "should pass if valid storage signature version is set": { - Config{SignatureVersion: SignatureVersionV4, StorageClass: aws.StorageClassStandardInfrequentAccess}, - nil, - }, - } +func TestConfigParsesCredentialsInlineWithSessionToken(t *testing.T) { + var cfg = Config{} + yamlCfg := ` +access_key_id: access key id +secret_access_key: secret access key +session_token: session token +` + err := yaml.Unmarshal([]byte(yamlCfg), &cfg) + require.NoError(t, err) + + require.Equal(t, cfg.AccessKeyID, "access key id") + require.Equal(t, cfg.SecretAccessKey.String(), "secret access key") + require.Equal(t, cfg.SessionToken.String(), "session token") +} - for name, test := range tests { - actual := test.cfg.Validate() - assert.Equal(t, test.expectedErr, actual, name) +func TestConfigRedactsCredentials(t *testing.T) { + cfg := Config{ + AccessKeyID: "access key id", + SecretAccessKey: flagext.SecretWithValue("secret access key"), + SessionToken: flagext.SecretWithValue("session token"), } + + output, err := yaml.Marshal(cfg) + require.NoError(t, err) + + require.True(t, bytes.Contains(output, []byte("access key id"))) + require.False(t, bytes.Contains(output, []byte("secret access id"))) + require.False(t, bytes.Contains(output, []byte("session token"))) } diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index 9ab8c9116339f..65817f38c9d9f 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -563,7 +563,7 @@ func isContextErr(err error) bool { } // IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts. -func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool { +func IsStorageTimeoutErr(err error) bool { // TODO(dannyk): move these out to be generic // context errors are all client-side if isContextErr(err) { @@ -599,7 +599,7 @@ func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool { } // IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling. -func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool { +func IsStorageThrottledErr(err error) bool { if rerr, ok := err.(awserr.RequestFailure); ok { // https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html @@ -609,6 +609,11 @@ func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool { return false } + +func IsRetryableErr(err error) bool { + return IsStorageTimeoutErr(err) || IsStorageThrottledErr(err) +} + func (a *S3ObjectClient) IsRetryableErr(err error) bool { - return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err) + return IsRetryableErr(err) } diff --git a/pkg/storage/chunk/client/aws/s3_thanos_object_client.go b/pkg/storage/chunk/client/aws/s3_thanos_object_client.go new file mode 100644 index 0000000000000..e00ded920d552 --- /dev/null +++ b/pkg/storage/chunk/client/aws/s3_thanos_object_client.go @@ -0,0 +1,44 @@ +package aws + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/storage/bucket" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" +) + +func NewS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { + b, err := newS3ThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) + if err != nil { + return nil, err + } + + var hedged objstore.Bucket + if hedgingCfg.At != 0 { + hedged, err = newS3ThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) + if err != nil { + return nil, err + } + } + + o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr)) + return o, nil +} + +func newS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { + if hedging { + hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) + if err != nil { + return nil, err + } + + cfg.S3.HTTP.Transport = hedgedTrasport + } + + return bucket.NewClient(ctx, bucket.S3, cfg, component, logger) +} diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 79135abd26d00..7f4046a47d868 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -654,6 +654,10 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr if cfg.CongestionControl.Enabled { s3Cfg.BackoffConfig.MaxRetries = 1 } + + if cfg.UseThanosObjstore { + return aws.NewS3ThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) + } return aws.NewS3ObjectClient(s3Cfg, cfg.Hedging) case types.StorageTypeAlibabaCloud: diff --git a/pkg/util/http.go b/pkg/util/http.go index c3c64ea1e3a86..3fdfca6df24f1 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -298,3 +298,12 @@ func FlagFromValues(values url.Values, key string, d bool) bool { return d } } + +func IsValidURL(endpoint string) bool { + u, err := url.Parse(endpoint) + if err != nil { + return false + } + + return u.Scheme != "" && u.Host != "" +}