Skip to content

Commit

Permalink
Added experimental filesystem bucket implementation (thanos-io#1690)
Browse files Browse the repository at this point in the history
* Added experimental filesystem bucket implementation

Usa cases:
* See: observatorium/thanos-replicate#7
* Local testing, demos

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Fixed edge case.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Disabled one test case. We cannot rely on this.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Oct 29, 2019
1 parent 89576af commit b7f3ac9
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 16 deletions.
20 changes: 19 additions & 1 deletion docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ Current object storage client implementations:
|----------------------|-------------------|-----------|---------------|
| [Google Cloud Storage](./storage.md#gcs) | Stable (production usage) | yes | @bwplotka |
| [AWS/S3](./storage.md#s3) | Stable (production usage) | yes | @bwplotka |
| [Azure Storage Account](./storage.md#azure) | Stable (production usage) | yes | @vglafirov |
| [Azure Storage Account](./storage.md#azure) | Stable (production usage) | no | @vglafirov |
| [OpenStack Swift](./storage.md#openstack-swift) | Beta (working PoCs, testing usage) | no | @sudhi-vm |
| [Tencent COS](./storage.md#tencent-cos) | Beta (testing usage) | no | @jojohappy |
| [AliYun OSS](./storage.md#aliyun-oss) | Beta (testing usage) | no | @shaulboozhiao,@wujinhu |
| [Local Filesystem](./storage.md#filesystem) | Beta (testing usage) | yes | @bwplotka |

NOTE: Currently Thanos requires strong consistency (write-read) for object store implementation.

Expand Down Expand Up @@ -354,3 +355,20 @@ config:
```

Use --objstore.config-file to reference to this configuration file.

### Filesystem

This storage type is used when user wants to store and access the bucket in the local filesystem.
We treat filesystem the same way we would treat object storage, so all optimization for remote bucket applies even though,
we might have the files locally.

NOTE: This storage type is experimental and might be inefficient. It is NOT advised to use it as the main storage for metrics
in production environment. Particularly there is no planned support for distributed filesystems like NFS.
This is mainly useful for testing and demos.

[embedmd]:# (flags/config_bucket_filesystem.txt yaml)
```yaml
type: FILESYSTEM
config:
directory: ""
```
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ github.com/samuel/go-zookeeper v0.0.0-20190810000440-0ceca61e4d75 h1:cA+Ubq9qEVI
github.com/samuel/go-zookeeper v0.0.0-20190810000440-0ceca61e4d75/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/satori/go.uuid v0.0.0-20160603004225-b111a074d5ef h1:RoeI7K0oZIcUirMHsFpQjTVDrl1ouNh8T7v3eNsUxL0=
github.com/satori/go.uuid v0.0.0-20160603004225-b111a074d5ef/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
Expand Down
1 change: 1 addition & 0 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
sort.Slice(rem, func(i, j int) bool {
return rem[i].Compare(rem[j]) < 0
})

// Only the level 3 block, the last source block in both resolutions should be left.
testutil.Equals(t, []ulid.ULID{metas[9].ULID, m3.ULID, m4.ULID}, rem)

Expand Down
16 changes: 10 additions & 6 deletions pkg/objstore/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/azure"
"github.com/thanos-io/thanos/pkg/objstore/cos"
"github.com/thanos-io/thanos/pkg/objstore/filesystem"
"github.com/thanos-io/thanos/pkg/objstore/gcs"
"github.com/thanos-io/thanos/pkg/objstore/oss"
"github.com/thanos-io/thanos/pkg/objstore/s3"
Expand All @@ -22,12 +23,13 @@ import (
type ObjProvider string

const (
GCS ObjProvider = "GCS"
S3 ObjProvider = "S3"
AZURE ObjProvider = "AZURE"
SWIFT ObjProvider = "SWIFT"
COS ObjProvider = "COS"
ALIYUNOSS ObjProvider = "ALIYUNOSS"
FILESYSTEM ObjProvider = "FILESYSTEM"
GCS ObjProvider = "GCS"
S3 ObjProvider = "S3"
AZURE ObjProvider = "AZURE"
SWIFT ObjProvider = "SWIFT"
COS ObjProvider = "COS"
ALIYUNOSS ObjProvider = "ALIYUNOSS"
)

type BucketConfig struct {
Expand Down Expand Up @@ -63,6 +65,8 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe
bucket, err = cos.NewBucket(logger, config, component)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
default:
return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type)
}
Expand Down
210 changes: 210 additions & 0 deletions pkg/objstore/filesystem/filesystem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package filesystem

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/thanos-io/thanos/pkg/objstore"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/runutil"

"github.com/pkg/errors"
)

// Config stores the configuration for storing and accessing blobs in filesystem.
type Config struct {
Directory string `yaml:"directory"`
}

// Bucket implements the objstore.Bucket interfaces against filesystem that binary runs on.
// Methods from Bucket interface are thread-safe. Objects are assumed to be immutable.
// NOTE: It does not follow symbolic links.
type Bucket struct {
rootDir string
}

// NewBucketFromConfig returns a new filesystem.Bucket from config.
func NewBucketFromConfig(conf []byte) (*Bucket, error) {
var c Config
if err := yaml.Unmarshal(conf, &c); err != nil {
return nil, err
}
if c.Directory == "" {
return nil, errors.New("missing directory for filesystem bucket")
}
return NewBucket(c.Directory)
}

// NewBucket returns a new filesystem.Bucket.
func NewBucket(rootDir string) (*Bucket, error) {
absDir, err := filepath.Abs(rootDir)
if err != nil {
return nil, err
}
return &Bucket{rootDir: absDir}, nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
absDir := filepath.Join(b.rootDir, dir)
info, err := os.Stat(absDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrapf(err, "stat %s", absDir)
}
if !info.IsDir() {
return nil
}

files, err := ioutil.ReadDir(absDir)
if err != nil {
return err
}
for _, file := range files {
name := filepath.Join(dir, file.Name())

if file.IsDir() {
empty, err := isDirEmpty(filepath.Join(absDir, file.Name()))
if err != nil {
return err
}

if empty {
// Skip empty directories.
continue
}
name += objstore.DirDelim
}
if err := f(name); err != nil {
return err
}
}
return nil
}

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.GetRange(ctx, name, 0, -1)
}

type rangeReaderCloser struct {
io.Reader
f *os.File
}

func (r *rangeReaderCloser) Close() error {
return r.f.Close()
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
if name == "" {
return nil, errors.New("object name is empty")
}

file := filepath.Join(b.rootDir, name)
if _, err := os.Stat(file); err != nil {
return nil, errors.Wrapf(err, "stat %s", file)
}

f, err := os.OpenFile(file, os.O_RDONLY, 0666)
if err != nil {
return nil, err
}

if off > 0 {
_, err := f.Seek(off, 0)
if err != nil {
return nil, errors.Wrapf(err, "seek %v", off)
}
}

if length == -1 {
return f, nil
}

return &rangeReaderCloser{Reader: io.LimitReader(f, length), f: f}, nil
}

// Exists checks if the given directory exists in memory.
func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
info, err := os.Stat(filepath.Join(b.rootDir, name))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, errors.Wrapf(err, "stat %s", filepath.Join(b.rootDir, name))
}
return !info.IsDir(), nil
}

// Upload writes the file specified in src to into the memory.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (err error) {
file := filepath.Join(b.rootDir, name)
if err := os.MkdirAll(filepath.Dir(file), os.ModePerm); err != nil {
return err
}

f, err := os.Create(file)
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(&err, f, "close")

if _, err := io.Copy(f, r); err != nil {
return errors.Wrapf(err, "copy to %s", file)
}
return nil
}

func isDirEmpty(name string) (ok bool, err error) {
f, err := os.Open(name)
if err != nil {
return false, err
}
defer runutil.CloseWithErrCapture(&err, f, "dir open")

if _, err = f.Readdir(1); err == io.EOF {
return true, nil
}
return false, err
}

// Delete removes all data prefixed with the dir.
func (b *Bucket) Delete(_ context.Context, name string) error {
file := filepath.Join(b.rootDir, name)
for file != b.rootDir {
if err := os.RemoveAll(file); err != nil {
return errors.Wrapf(err, "rm %s", file)
}
file = filepath.Dir(file)
empty, err := isDirEmpty(file)
if err != nil {
return err
}
if !empty {
break
}
}
return nil
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
return os.IsNotExist(errors.Cause(err))
}

func (b *Bucket) Close() error { return nil }

// Name returns the bucket name.
func (b *Bucket) Name() string {
return fmt.Sprintf("fs: %s", b.rootDir)
}
10 changes: 7 additions & 3 deletions pkg/objstore/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

var errNotFound = errors.New("inmem: object not found")

// Bucket implements the store.Bucket and shipper.Bucket interfaces against local memory.
// methods from Bucket interface are thread-safe. Object are assumed to be immutable.
// Bucket implements the objstore.Bucket interfaces against local memory.
// Methods from Bucket interface are thread-safe. Objects are assumed to be immutable.
type Bucket struct {
mtx sync.RWMutex
objects map[string][]byte
Expand Down Expand Up @@ -116,7 +116,11 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io
}

if int64(len(file)) < off {
return nil, errors.Errorf("inmem: offset larger than content length. Len %d. Offset: %v", len(file), off)
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}

if length == -1 {
return ioutil.NopCloser(bytes.NewReader(file[off:])), nil
}

if int64(len(file)) <= off+length {
Expand Down
36 changes: 36 additions & 0 deletions pkg/objstore/objtesting/acceptance_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, "tes", string(content))

// Unspecified range with offset.
rcUnspecifiedLen, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, -1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rcUnspecifiedLen.Close()) }()
content, err = ioutil.ReadAll(rcUnspecifiedLen)
testutil.Ok(t, err)
testutil.Equals(t, "test-data@", string(content))

// Out of band offset. Do not rely on outcome.
// NOTE: For various providers we have different outcome.
// * GCS is giving 416 status code
// * S3 errors immdiately with invalid range error.
// * inmem and filesystem are returning 0 bytes.
//rcOffset, err := bkt.GetRange(ctx, "id1/obj_1.some", 124141, 3)

// Out of band length. We expect to read file fully.
rcLength, err := bkt.GetRange(ctx, "id1/obj_1.some", 3, 9999)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rcLength.Close()) }()
content, err = ioutil.ReadAll(rcLength)
testutil.Ok(t, err)
testutil.Equals(t, "st-data@", string(content))

ok, err = bkt.Exists(ctx, "id1/obj_1.some")
testutil.Ok(t, err)
testutil.Assert(t, ok, "expected exits")
Expand Down Expand Up @@ -97,6 +120,7 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
}))

testutil.Ok(t, bkt.Delete(ctx, "id1/obj_2.some"))

// Delete is expected to fail on non existing object.
// NOTE: Don't rely on this. S3 is not complying with this as GCS is.
// testutil.NotOk(t, bkt.Delete(ctx, "id1/obj_2.some"))
Expand All @@ -108,5 +132,17 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
return nil
}))
testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_3.some"}, seen)

testutil.Ok(t, bkt.Delete(ctx, "id2/obj_4.some"))

seen = []string{}
testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error {
seen = append(seen, fn)
return nil
}))
expected = []string{"obj_5.some", "id1/"}
sort.Strings(expected)
sort.Strings(seen)
testutil.Equals(t, expected, seen)
})
}
Loading

0 comments on commit b7f3ac9

Please sign in to comment.