Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): AWS backend using thanos.io/objstore #11221

Merged
merged 26 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
43bba36
Groundwork for azure implementation
JoaoBraveCoding Nov 24, 2023
b93fbcd
Fixed configuration for bucket/azure
JoaoBraveCoding Nov 24, 2023
a259abd
Added support for BlobStorage using thanos/objstore
JoaoBraveCoding Nov 24, 2023
42f37ba
Azure CLI review
JoaoBraveCoding Nov 27, 2023
40aa2f9
Fixes from testing
JoaoBraveCoding Dec 7, 2023
e394042
Merge branch 'main' into log-4550-azure
JoaoBraveCoding Mar 14, 2024
02218a4
Merge branch 'main' into log-4550-azure
JoaoBraveCoding Mar 18, 2024
6db3b13
Merge branch 'main' into log-4550-azure
JoaoBraveCoding Apr 22, 2024
4399687
Merge branch 'main' into log-4550-azure
JoaoBraveCoding Jul 1, 2024
1386834
Merge branch 'main' into log-4550-azure
ashwanthgoli Oct 21, 2024
679c2a8
clean-up deleted files from main
ashwanthgoli Oct 21, 2024
06f2280
config parity
ashwanthgoli Oct 21, 2024
8d92fbd
add missing methods to azure thanos adapter
ashwanthgoli Oct 21, 2024
76c41fe
use objectclient adapter
ashwanthgoli Oct 22, 2024
0557d69
lint
ashwanthgoli Oct 22, 2024
a57c140
make format
ashwanthgoli Oct 22, 2024
a7da0f4
remove gcs comment
ashwanthgoli Oct 22, 2024
4337f02
s3: add support for thanos client
ashwanthgoli Oct 22, 2024
b032654
make format
ashwanthgoli Oct 22, 2024
f99e5e7
Update pkg/storage/chunk/client/aws/s3_storage_client.go
ashwanthgoli Oct 24, 2024
c9b0c10
review suggestions
ashwanthgoli Oct 24, 2024
65a0aeb
fixup! review suggestions
ashwanthgoli Oct 25, 2024
5c52572
Merge branch 'main' into log-4550-aws
ashwanthgoli Oct 25, 2024
3cd8596
use retryfunc option
ashwanthgoli Oct 25, 2024
cf4efc4
fix config_test
ashwanthgoli Oct 25, 2024
4752c12
remove signature version. not configurable anymore, defaults to v4
ashwanthgoli Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
package azure

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/azure"
yaml "gopkg.in/yaml.v2"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := azure.Config{
StorageAccountName: cfg.StorageAccountName,
StorageAccountKey: cfg.StorageAccountKey.String(),
StorageConnectionString: cfg.ConnectionString.String(),
ContainerName: cfg.ContainerName,
Endpoint: cfg.EndpointSuffix,
MaxRetries: cfg.MaxRetries,
HTTPConfig: azure.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.ResponseHeaderTimeout),
InsecureSkipVerify: cfg.InsecureSkipVerify,
TLSHandshakeTimeout: model.Duration(cfg.TLSHandshakeTimeout),
ExpectContinueTimeout: model.Duration(cfg.ExpectContinueTimeout),
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.MaxConnsPerHost,
},
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig)
}

func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
// Start with default config to make sure that all parameters are set to sensible values, especially
// HTTP Config field.
bucketConfig := azure.DefaultConfig
bucketConfig.StorageAccountName = cfg.StorageAccountName
bucketConfig.StorageAccountKey = cfg.StorageAccountKey.String()
bucketConfig.StorageConnectionString = cfg.StorageConnectionString.String()
bucketConfig.ContainerName = cfg.ContainerName
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.UserAssignedID = cfg.UserAssignedID

if cfg.Endpoint != "" {
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
bucketConfig.Endpoint = cfg.Endpoint
}

// Thanos currently doesn't support passing the config as is, but expects a YAML,
// so we're going to serialize it.
serialized, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
var rt http.RoundTripper
if cfg.Transport != nil {
rt = cfg.Transport
}

return azure.NewBucket(logger, serialized, name, nil)
return factory(logger, bucketConfig, name, rt)
}
29 changes: 15 additions & 14 deletions pkg/storage/bucket/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package azure

import (
"flag"
"net/http"

"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/storage/bucket/http"
)

// Config holds the config options for an Azure backend
type Config struct {
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ConnectionString flagext.Secret `yaml:"connection_string"`
ContainerName string `yaml:"container_name"`
EndpointSuffix string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
StorageConnectionString flagext.Secret `yaml:"connection_string"`
ContainerName string `yaml:"container_name"`
Endpoint string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
UserAssignedID string `yaml:"user_assigned_id"`

http.Config `yaml:"http"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlags registers the flags for Azure storage
Expand All @@ -28,10 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers the flags for Azure storage
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key")
f.Var(&cfg.ConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the values of `account-name` and `endpoint-suffix` values will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name")
f.StringVar(&cfg.EndpointSuffix, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key. If unset, Azure managed identities will be used for authentication instead.")
f.Var(&cfg.StorageConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the value of `endpoint-suffix` will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name")
f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN. If set to empty string, default endpoint suffix is used.")
f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors")
cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f)
f.StringVar(&cfg.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned managed identity. If empty, then System assigned identity is used.")
}
98 changes: 0 additions & 98 deletions pkg/storage/bucket/azure/config_test.go

This file was deleted.

143 changes: 143 additions & 0 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package bucket

import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)

type ObjectClientAdapter struct {
bucket, hedgedBucket objstore.Bucket
logger log.Logger
isRetryableErr func(err error) bool
}

func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger) *ObjectClientAdapter {
if hedgedBucket == nil {
hedgedBucket = bucket
}

return &ObjectClientAdapter{
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
// default to no retryable errors. Override with WithRetryableErrFunc
isRetryableErr: func(_ error) bool {
return false
},
}
}

func WithRetryableErrFunc(f func(err error) bool) func(*ObjectClientAdapter) {
return func(o *ObjectClientAdapter) {
o.isRetryableErr = f
}
}

func (o *ObjectClientAdapter) Stop() {
}

// ObjectExists checks if a given objectKey exists in the bucket
func (o *ObjectClientAdapter) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return o.bucket.Exists(ctx, objectKey)
}

// GetAttributes returns the attributes of the specified object key from the configured bucket.
func (o *ObjectClientAdapter) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attr := client.ObjectAttributes{}
thanosAttr, err := o.hedgedBucket.Attributes(ctx, objectKey)
if err != nil {
return attr, err
}

attr.Size = thanosAttr.Size
return attr, nil
}

// PutObject puts the specified bytes into the configured bucket at the provided key
func (o *ObjectClientAdapter) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return o.bucket.Upload(ctx, objectKey, object)
}

// GetObject returns a reader and the size for the specified object key from the configured bucket.
// size is set to -1 if it cannot be succefully determined, it is up to the caller to check this value before using it.
func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := o.hedgedBucket.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

size, err := objstore.TryToGetSize(reader)
if err != nil {
size = -1
level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err)
}

return reader, size, err
}

func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return o.hedgedBucket.GetRange(ctx, objectKey, offset, length)
}

// List objects with given prefix.
func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var iterParams []objstore.IterOption

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
}

err := o.bucket.Iter(ctx, prefix, func(objectKey string) error {
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}

// TODO: remove this once thanos support IterWithAttributes
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
})

return nil
}, iterParams...)
if err != nil {
return nil, nil, err
}

return storageObjects, commonPrefixes, nil
}

// DeleteObject deletes the specified object key from the configured bucket.
func (o *ObjectClientAdapter) DeleteObject(ctx context.Context, objectKey string) error {
return o.bucket.Delete(ctx, objectKey)
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (o *ObjectClientAdapter) IsObjectNotFoundErr(err error) bool {
return o.bucket.IsObjNotFoundErr(err)
}

// IsRetryableErr returns true if the request failed due to some retryable server-side scenario
func (o *ObjectClientAdapter) IsRetryableErr(err error) bool {
return o.isRetryableErr(err)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gcp
package bucket

import (
"bytes"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)

func TestGCSThanosObjStore_List(t *testing.T) {
func TestObjectClientAdapter_List(t *testing.T) {
tests := []struct {
name string
prefix string
Expand Down Expand Up @@ -95,10 +95,10 @@ func TestGCSThanosObjStore_List(t *testing.T) {
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

gcpClient := &GCSThanosObjectClient{}
gcpClient.client = newBucket
client := NewObjectClientAdapter(newBucket, nil, nil)
client.bucket = newBucket

storageObj, storageCommonPref, err := gcpClient.List(context.Background(), tt.prefix, tt.delimiter)
storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
require.Equal(t, tt.wantErr.Error(), err.Error())
continue
Expand Down
Loading
Loading