Skip to content

Commit

Permalink
Merge pull request #63 from fpetkovski/iter-object-attributes
Browse files Browse the repository at this point in the history
Add support for object attributes in Iter call
  • Loading branch information
fpetkovski authored Nov 5, 2024
2 parents 8648e8e + 2c4ff97 commit b598dce
Show file tree
Hide file tree
Showing 20 changed files with 534 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations.

### Added
- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client.
- [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support.
- [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class.
- [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials.
Expand Down
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m

The core this module is the [`Bucket` interface](objstore.go):

```go mdox-exec="sed -n '37,50p' objstore.go"
```go mdox-exec="sed -n '39,55p' objstore.go"
// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
Expand All @@ -63,18 +63,31 @@ type Bucket interface {
// If object does not exist in the moment of deletion, Delete should throw error.
Delete(ctx context.Context, name string) error

// Name returns the bucket name for the provider.
Name() string
}
```

All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go):

```go mdox-exec="sed -n '68,93p' objstore.go"

```go mdox-exec="sed -n '71,106p' objstore.go"
// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.

// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
SupportedIterOptions() []IterOptionType

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -374,6 +387,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -447,6 +461,7 @@ config:
storage_account: ""
storage_account_key: ""
storage_connection_string: ""
storage_create_container: false
container: ""
endpoint: ""
user_assigned_id: ""
Expand Down
14 changes: 14 additions & 0 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error,
return nil
}

func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
return []IterOptionType{Recursive}
}

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}

return b.Iter(ctx, dir, func(name string) error {
return f(IterObjectAttributes{Name: name})
}, options...)
}

// Get returns a reader for the given object name.
func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
if name == "" {
Expand Down
109 changes: 100 additions & 9 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package objstore
import (
"bytes"
"context"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -70,8 +72,19 @@ type InstrumentedBucket interface {
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.

// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
SupportedIterOptions() []IterOptionType

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface {
ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

var ErrOptionNotSupported = errors.New("iter option is not supported")

// IterOptionType is used for type-safe option support checking.
type IterOptionType int

const (
Recursive IterOptionType = iota
UpdatedAt
)

// IterOption configures the provided params.
type IterOption func(params *IterParams)
type IterOption struct {
Type IterOptionType
Apply func(params *IterParams)
}

// WithRecursiveIter is an option that can be applied to Iter() to recursively list objects
// in the bucket.
func WithRecursiveIter(params *IterParams) {
params.Recursive = true
func WithRecursiveIter() IterOption {
return IterOption{
Type: Recursive,
Apply: func(params *IterParams) {
params.Recursive = true
},
}
}

// WithUpdatedAt is an option that can be applied to Iter() to
// include the last modified time in the attributes.
// NB: Prefixes may not report last modified time.
// This option is currently supported for the azure, s3, bos, gcs and filesystem providers.
func WithUpdatedAt() IterOption {
return IterOption{
Type: UpdatedAt,
Apply: func(params *IterParams) {
params.LastModified = true
},
}
}

// IterParams holds the Iter() parameters and is used by objstore clients implementations.
type IterParams struct {
Recursive bool
Recursive bool
LastModified bool
}

func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error {
for _, opt := range options {
if !slices.Contains(supportedOptions, opt.Type) {
return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type)
}
}

return nil
}

func ApplyIterOptions(options ...IterOption) IterParams {
out := IterParams{}
for _, opt := range options {
opt(&out)
opt.Apply(&out)
}
return out
}
Expand Down Expand Up @@ -189,6 +244,20 @@ type ObjectAttributes struct {
LastModified time.Time `json:"last_modified"`
}

type IterObjectAttributes struct {
Name string
lastModified time.Time
}

func (i *IterObjectAttributes) SetLastModified(t time.Time) {
i.lastModified = t
}

// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available.
func (i *IterObjectAttributes) LastModified() (time.Time, bool) {
return i.lastModified, !i.lastModified.IsZero()
}

// TryToGetSize tries to get upfront size from reader.
// Some implementations may return only size of unread data in the reader, so it's best to call this method before
// doing any reading.
Expand Down Expand Up @@ -533,21 +602,43 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
return b.WithExpectedErrs(fn)
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error {
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

start := time.Now()
timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op))
defer timer.ObserveDuration()

err := b.bkt.Iter(ctx, dir, f, options...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}
b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
return err
}

func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

timer := prometheus.NewTimer(b.metrics.opsDuration.WithLabelValues(op))
defer timer.ObserveDuration()

err := b.bkt.IterWithAttributes(ctx, dir, f, options...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}

return err
}

func (b *metricBucket) SupportedIterOptions() []IterOptionType {
return b.bkt.SupportedIterOptions()
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = OpAttributes
b.metrics.ops.WithLabelValues(op).Inc()
Expand Down
13 changes: 13 additions & 0 deletions prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er
}, options...)
}

func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
pdir := withPrefix(p.prefix, dir)

return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error {
attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim)
return f(attrs)
}, options...)
}

func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType {
return p.bkt.SupportedIterOptions()
}

// Get returns a reader for the given object name.
func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name))
Expand Down
2 changes: 1 addition & 1 deletion prefixed_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
seen = append(seen, fn)
return nil
}, WithRecursiveIter))
}, WithRecursiveIter()))
expected := []string{"dir/file1.jpg", "file1.jpg"}
sort.Strings(expected)
sort.Strings(seen)
Expand Down
47 changes: 41 additions & 6 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR
return bkt, nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
}

func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}

prefix := dir
if prefix != "" && !strings.HasSuffix(prefix, DirDelim) {
prefix += DirDelim
Expand All @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blob := range resp.Segment.BlobItems {
if err := f(*blob.Name); err != nil {
attrs := objstore.IterObjectAttributes{
Name: *blob.Name,
}
if params.LastModified {
attrs.SetLastModified(*blob.Properties.LastModified)
}
if err := f(attrs); err != nil {
return err
}
}
Expand All @@ -227,19 +239,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blobItem := range resp.Segment.BlobItems {
if err := f(*blobItem.Name); err != nil {
attrs := objstore.IterObjectAttributes{
Name: *blobItem.Name,
}
if params.LastModified {
attrs.SetLastModified(*blobItem.Properties.LastModified)
}
if err := f(attrs); err != nil {
return err
}
}
for _, blobPrefix := range resp.Segment.BlobPrefixes {
if err := f(*blobPrefix.Name); err != nil {
if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil {
return err
}
}
}
return nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
// Only include recursive option since attributes are not used in this method.
var filteredOpts []objstore.IterOption
for _, opt := range opts {
if opt.Type == objstore.Recursive {
filteredOpts = append(filteredOpts, opt)
break
}
}

return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error {
return f(attrs.Name)
}, filteredOpts...)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
if err == nil {
Expand Down
Loading

0 comments on commit b598dce

Please sign in to comment.