Skip to content

Commit

Permalink
Add Provider() method to objstore.Client
Browse files Browse the repository at this point in the history
This commit adds a Provider() method to the objstore.Client interface.
The method is useful when the client was created from a config file using
the client factory.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 18, 2024
1 parent 2c4ff97 commit 49d8d1c
Show file tree
Hide file tree
Showing 20 changed files with 92 additions and 53 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m

The core this module is the [`Bucket` interface](objstore.go):

```go mdox-exec="sed -n '39,55p' objstore.go"
```go mdox-exec="sed -n '55,73p' objstore.go"
// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
io.Closer
BucketReader

Provider() ObjProvider

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader) error
Expand All @@ -70,7 +72,7 @@ type Bucket interface {

All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go):

```go mdox-exec="sed -n '71,106p' objstore.go"
```go mdox-exec="sed -n '89,124p' objstore.go"
// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
Expand Down
41 changes: 13 additions & 28 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,10 @@ import (
"gopkg.in/yaml.v2"
)

type ObjProvider string

const (
FILESYSTEM ObjProvider = "FILESYSTEM"
GCS ObjProvider = "GCS"
S3 ObjProvider = "S3"
AZURE ObjProvider = "AZURE"
SWIFT ObjProvider = "SWIFT"
COS ObjProvider = "COS"
ALIYUNOSS ObjProvider = "ALIYUNOSS"
BOS ObjProvider = "BOS"
OCI ObjProvider = "OCI"
OBS ObjProvider = "OBS"
)

type BucketConfig struct {
Type ObjProvider `yaml:"type"`
Config interface{} `yaml:"config"`
Prefix string `yaml:"prefix" default:""`
Type objstore.ObjProvider `yaml:"type"`
Config interface{} `yaml:"config"`
Prefix string `yaml:"prefix" default:""`
}

// NewBucket initializes and returns new object storage clients.
Expand All @@ -64,25 +49,25 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string, wrap

var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
case string(objstore.GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, wrapRoundtripper)
case string(S3):
case string(objstore.S3):
bucket, err = s3.NewBucket(logger, config, component, wrapRoundtripper)
case string(AZURE):
case string(objstore.AZURE):
bucket, err = azure.NewBucket(logger, config, component, wrapRoundtripper)
case string(SWIFT):
case string(objstore.SWIFT):
bucket, err = swift.NewContainer(logger, config, wrapRoundtripper)
case string(COS):
case string(objstore.COS):
bucket, err = cos.NewBucket(logger, config, component, wrapRoundtripper)
case string(ALIYUNOSS):
case string(objstore.ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component, wrapRoundtripper)
case string(FILESYSTEM):
case string(objstore.FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
case string(objstore.BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
case string(objstore.OCI):
bucket, err = oci.NewBucket(logger, config, wrapRoundtripper)
case string(OBS):
case string(objstore.OBS):
bucket, err = obs.NewBucket(logger, config)
default:
return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type)
Expand Down
2 changes: 2 additions & 0 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewInMemBucket() *InMemBucket {
}
}

func (b *InMemBucket) Provider() ObjProvider { return MEMORY }

// Objects returns a copy of the internally stored objects.
// NOTE: For assert purposes.
func (b *InMemBucket) Objects() map[string][]byte {
Expand Down
22 changes: 22 additions & 0 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ import (
"golang.org/x/sync/errgroup"
)

type ObjProvider string

const (
MEMORY ObjProvider = "MEMORY"
FILESYSTEM ObjProvider = "FILESYSTEM"
GCS ObjProvider = "GCS"
S3 ObjProvider = "S3"
AZURE ObjProvider = "AZURE"
SWIFT ObjProvider = "SWIFT"
COS ObjProvider = "COS"
ALIYUNOSS ObjProvider = "ALIYUNOSS"
BOS ObjProvider = "BOS"
OCI ObjProvider = "OCI"
OBS ObjProvider = "OBS"
)

const (
OpIter = "iter"
OpGet = "get"
Expand All @@ -42,6 +58,8 @@ type Bucket interface {
io.Closer
BucketReader

Provider() ObjProvider

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader) error
Expand Down Expand Up @@ -583,6 +601,10 @@ type metricBucket struct {
metrics *Metrics
}

func (b *metricBucket) Provider() ObjProvider {
return b.bkt.Provider()
}

func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket {
return &metricBucket{
bkt: b.bkt,
Expand Down
21 changes: 10 additions & 11 deletions objtesting/foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/objstore/providers/azure"
"github.com/thanos-io/objstore/providers/bos"
"github.com/thanos-io/objstore/providers/cos"
Expand All @@ -26,7 +25,7 @@ import (

// IsObjStoreSkipped returns true if given provider ID is found in THANOS_TEST_OBJSTORE_SKIP array delimited by comma e.g:
// THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI.
func IsObjStoreSkipped(t *testing.T, provider client.ObjProvider) bool {
func IsObjStoreSkipped(t *testing.T, provider objstore.ObjProvider) bool {
if e, ok := os.LookupEnv("THANOS_TEST_OBJSTORE_SKIP"); ok {
obstores := strings.Split(e, ",")
for _, objstore := range obstores {
Expand Down Expand Up @@ -69,7 +68,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
})

// Optional GCS.
if !IsObjStoreSkipped(t, client.GCS) {
if !IsObjStoreSkipped(t, objstore.GCS) {
t.Run("gcs", func(t *testing.T) {
bkt, closeFn, err := gcs.NewTestBucket(t, os.Getenv("GCP_PROJECT"))
testutil.Ok(t, err)
Expand All @@ -84,7 +83,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional S3.
if !IsObjStoreSkipped(t, client.S3) {
if !IsObjStoreSkipped(t, objstore.S3) {
t.Run("aws s3", func(t *testing.T) {
// TODO(bwplotka): Allow taking location from envvar.
bkt, closeFn, err := s3.NewTestBucket(t, "us-west-2")
Expand All @@ -103,7 +102,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional Azure.
if !IsObjStoreSkipped(t, client.AZURE) {
if !IsObjStoreSkipped(t, objstore.AZURE) {
t.Run("azure", func(t *testing.T) {
bkt, closeFn, err := azure.NewTestBucket(t, "e2e-tests")
testutil.Ok(t, err)
Expand All @@ -117,7 +116,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional SWIFT.
if !IsObjStoreSkipped(t, client.SWIFT) {
if !IsObjStoreSkipped(t, objstore.SWIFT) {
t.Run("swift", func(t *testing.T) {
container, closeFn, err := swift.NewTestContainer(t)
testutil.Ok(t, err)
Expand All @@ -131,7 +130,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional COS.
if !IsObjStoreSkipped(t, client.COS) {
if !IsObjStoreSkipped(t, objstore.COS) {
t.Run("Tencent cos", func(t *testing.T) {
bkt, closeFn, err := cos.NewTestBucket(t)
testutil.Ok(t, err)
Expand All @@ -145,7 +144,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional OSS.
if !IsObjStoreSkipped(t, client.ALIYUNOSS) {
if !IsObjStoreSkipped(t, objstore.ALIYUNOSS) {
t.Run("AliYun oss", func(t *testing.T) {
bkt, closeFn, err := oss.NewTestBucket(t)
testutil.Ok(t, err)
Expand All @@ -159,7 +158,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional BOS.
if !IsObjStoreSkipped(t, client.BOS) {
if !IsObjStoreSkipped(t, objstore.BOS) {
t.Run("Baidu BOS", func(t *testing.T) {
bkt, closeFn, err := bos.NewTestBucket(t)
testutil.Ok(t, err)
Expand All @@ -173,7 +172,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional OCI.
if !IsObjStoreSkipped(t, client.OCI) {
if !IsObjStoreSkipped(t, objstore.OCI) {
t.Run("oci", func(t *testing.T) {
bkt, closeFn, err := oci.NewTestBucket(t)
testutil.Ok(t, err)
Expand All @@ -186,7 +185,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
}

// Optional OBS.
if !IsObjStoreSkipped(t, client.OBS) {
if !IsObjStoreSkipped(t, objstore.OBS) {
t.Run("obs", func(t *testing.T) {
bkt, closeFn, err := obs.NewTestBucket(t, "cn-south-1")
testutil.Ok(t, err)
Expand Down
4 changes: 3 additions & 1 deletion prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func withPrefix(prefix, name string) string {
return prefix + DirDelim + name
}

func (p *PrefixedBucket) Provider() ObjProvider { return p.bkt.Provider() }

func (p *PrefixedBucket) Close() error {
return p.bkt.Close()
}
Expand Down Expand Up @@ -93,7 +95,7 @@ func (p *PrefixedBucket) IsAccessDeniedErr(err error) bool {
}

// Attributes returns information about the specified object.
func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
func (p *PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
return p.bkt.Attributes(ctx, conditionalPrefix(p.prefix, name))
}

Expand Down
2 changes: 2 additions & 0 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR
return bkt, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.AZURE }

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
}
Expand Down
2 changes: 2 additions & 0 deletions providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
return bkt, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.BOS }

// Name returns the bucket name for the provider.
func (b *Bucket) Name() string {
return b.name
Expand Down
2 changes: 2 additions & 0 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra
return bkt, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.COS }

// Name returns the bucket name for COS.
func (b *Bucket) Name() string {
return b.name
Expand Down
2 changes: 2 additions & 0 deletions providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func NewBucket(rootDir string) (*Bucket, error) {
return &Bucket{rootDir: absDir}, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.FILESYSTEM }

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
}
Expand Down
2 changes: 2 additions & 0 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func newBucket(ctx context.Context, logger log.Logger, gc Config, opts []option.
return bkt, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.GCS }

// Name returns the bucket name for gcs.
func (b *Bucket) Name() string {
return b.name
Expand Down
2 changes: 2 additions & 0 deletions providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func NewBucketWithConfig(logger log.Logger, config Config) (*Bucket, error) {
return bkt, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.OBS }

// Name returns the bucket name for the provider.
func (b *Bucket) Name() string {
return b.name
Expand Down
2 changes: 2 additions & 0 deletions providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Bucket struct {
requestMetadata common.RequestMetadata
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.OCI }

// Name returns the bucket name for the provider.
func (b *Bucket) Name() string {
return b.name
Expand Down
2 changes: 2 additions & 0 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return NewTestBucketFromConfig(t, c, false)
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.ALIYUNOSS }

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
Expand Down
2 changes: 2 additions & 0 deletions providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra
return bkt, nil
}

func (b *Bucket) Provider() objstore.ObjProvider { return objstore.S3 }

// Name returns the bucket name for s3.
func (b *Bucket) Name() string {
return b.name
Expand Down
2 changes: 2 additions & 0 deletions providers/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ func NewContainerFromConfig(logger log.Logger, sc *Config, createContainer bool,
}, nil
}

func (c *Container) Provider() objstore.ObjProvider { return objstore.SWIFT }

// Name returns the container name for swift.
func (c *Container) Name() string {
return c.name
Expand Down
23 changes: 12 additions & 11 deletions scripts/cfggen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"fmt"
"github.com/thanos-io/objstore"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -35,17 +36,17 @@ var (
configs map[string]interface{}
possibleValues []string

bucketConfigs = map[client.ObjProvider]interface{}{
client.AZURE: azure.Config{},
client.GCS: gcs.Config{},
client.S3: s3.DefaultConfig,
client.SWIFT: swift.DefaultConfig,
client.COS: cos.DefaultConfig,
client.ALIYUNOSS: oss.Config{},
client.FILESYSTEM: filesystem.Config{},
client.BOS: bos.Config{},
client.OCI: oci.Config{},
client.OBS: obs.DefaultConfig,
bucketConfigs = map[objstore.ObjProvider]interface{}{
objstore.AZURE: azure.Config{},
objstore.GCS: gcs.Config{},
objstore.S3: s3.DefaultConfig,
objstore.SWIFT: swift.DefaultConfig,
objstore.COS: cos.DefaultConfig,
objstore.ALIYUNOSS: oss.Config{},
objstore.FILESYSTEM: filesystem.Config{},
objstore.BOS: bos.Config{},
objstore.OCI: oci.Config{},
objstore.OBS: obs.DefaultConfig,
}
)

Expand Down
2 changes: 2 additions & 0 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ func WithDelay(bkt Bucket, delay time.Duration) Bucket {
return &delayingBucket{bkt: bkt, delay: delay}
}

func (d *delayingBucket) Provider() ObjProvider { return d.bkt.Provider() }

func (d *delayingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
time.Sleep(d.delay)
return d.bkt.Get(ctx, name)
Expand Down
Loading

0 comments on commit 49d8d1c

Please sign in to comment.