Skip to content

Commit

Permalink
decomposed fs: change blobstore layout (#2763)
Browse files Browse the repository at this point in the history
* change decomposed fs blobstore layout for oCIS and S3NG

* update mocks

* add changelog

* fix blobstore tests

* fix sonarcloud code fence language
  • Loading branch information
wkloucek authored Apr 22, 2022
1 parent a431b62 commit c437e57
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 89 deletions.
80 changes: 80 additions & 0 deletions changelog/unreleased/change-ocis-s3ng-fs-blob-layout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Change: Change the oCIS and S3NG storage driver blob store layout

We've optimized the oCIS and S3NG storage driver blob store layout.

For the oCIS storage driver, blobs will now be stored inside the folder
of a space, next to the nodes. This allows admins to easily archive, backup and restore
spaces as a whole with UNIX tooling. We also moved from a single folder for blobs to
multiple folders for blobs, to make the filesystem interactions more performant for
large numbers of files.

The previous layout on disk looked like this:

```markdown
|-- spaces
| |-- ..
| | |-- ..
| |-- xx
| |-- xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx <- partitioned space id
| |-- nodes
| |-- ..
| |-- xx
| |-- xx
| |-- xx
| |-- xx
| |-- -xxxx-xxxx-xxxx-xxxxxxxxxxxx <- partitioned node id
|-- blobs
|-- ..
|-- xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx <- blob id
```

Now it looks like this:

```markdown
|-- spaces
| |-- ..
| | |-- ..
|-- xx
|-- xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx <- partitioned space id
|-- nodes
| |-- ..
| |-- xx
| |-- xx
| |-- xx
| |-- xx
| |-- -xxxx-xxxx-xxxx-xxxxxxxxxxxx <- partitioned node id
|-- blobs
|-- ..
|-- xx
|-- xx
|-- xx
|-- xx
|-- -xxxx-xxxx-xxxx-xxxxxxxxxxxx <- partitioned blob id
```

For the S3NG storage driver, blobs will now be prefixed with the space id and
also a part of the blob id will be used as prefix. This creates a better prefix partitioning
and mitigates S3 api performance drops for large buckets (https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/).

The previous S3 bucket (blobs only looked like this):

```markdown
|-- ..
|-- xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx <- blob id
```

Now it looks like this:

```markdown
|-- ..
|-- xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx <- space id
|-- ..
|-- xx
|-- xx
|-- xx
|-- xx
|-- -xxxx-xxxx-xxxx-xxxxxxxxxxxx <- partitioned blob id
```

https://github.com/cs3org/reva/pull/2763
https://github.com/owncloud/ocis/issues/3557
37 changes: 25 additions & 12 deletions pkg/storage/fs/ocis/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/pkg/errors"
)

Expand All @@ -45,39 +47,50 @@ func New(root string) (*Blobstore, error) {
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(key string, data io.Reader) error {
f, err := os.OpenFile(bs.path(key), os.O_CREATE|os.O_WRONLY, 0700)
func (bs *Blobstore) Upload(node *node.Node, data io.Reader) error {

// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(bs.path(node)), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
}

f, err := os.OpenFile(bs.path(node), os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", key)
return errors.Wrapf(err, "could not open blob '%s' for writing", bs.path(node))
}

w := bufio.NewWriter(f)
_, err = w.ReadFrom(data)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", key)
return errors.Wrapf(err, "could not write blob '%s'", bs.path(node))
}

return w.Flush()
}

// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(key string) (io.ReadCloser, error) {
file, err := os.Open(bs.path(key))
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
file, err := os.Open(bs.path(node))
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", key)
return nil, errors.Wrapf(err, "could not read blob '%s'", bs.path(node))
}
return file, nil
}

// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(key string) error {
err := os.Remove(bs.path(key))
func (bs *Blobstore) Delete(node *node.Node) error {
err := os.Remove(bs.path(node))
if err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", key)
return errors.Wrapf(err, "could not delete blob '%s'", bs.path(node))
}
return nil
}

func (bs *Blobstore) path(key string) string {
return filepath.Join(bs.root, filepath.Clean(filepath.Join("/", key)))
func (bs *Blobstore) path(node *node.Node) string {
return filepath.Join(
bs.root,
filepath.Clean(filepath.Join(
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
),
)
}
23 changes: 14 additions & 9 deletions pkg/storage/fs/ocis/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path"

"github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/tests/helpers"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -34,7 +35,7 @@ import (
var _ = Describe("Blobstore", func() {
var (
tmpRoot string
key string
blobNode *node.Node
blobPath string
data []byte

Expand All @@ -47,10 +48,13 @@ var _ = Describe("Blobstore", func() {
Expect(err).ToNot(HaveOccurred())

data = []byte("1234567890")
key = "foo"
blobPath = path.Join(tmpRoot, "blobs", key)
blobNode = &node.Node{
SpaceID: "wonderfullspace",
BlobID: "huuuuugeblob",
}
blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob")

bs, err = blobstore.New(path.Join(tmpRoot, "blobs"))
bs, err = blobstore.New(path.Join(tmpRoot))
Expect(err).ToNot(HaveOccurred())
})

Expand All @@ -61,13 +65,13 @@ var _ = Describe("Blobstore", func() {
})

It("creates the root directory if it doesn't exist", func() {
_, err := os.Stat(path.Join(tmpRoot, "blobs"))
_, err := os.Stat(path.Join(tmpRoot))
Expect(err).ToNot(HaveOccurred())
})

Describe("Upload", func() {
It("writes the blob", func() {
err := bs.Upload(key, bytes.NewReader(data))
err := bs.Upload(blobNode, bytes.NewReader(data))
Expect(err).ToNot(HaveOccurred())

writtenBytes, err := ioutil.ReadFile(blobPath)
Expand All @@ -78,12 +82,13 @@ var _ = Describe("Blobstore", func() {

Context("with an existing blob", func() {
BeforeEach(func() {
Expect(os.MkdirAll(path.Dir(blobPath), 0700)).To(Succeed())
Expect(ioutil.WriteFile(blobPath, data, 0700)).To(Succeed())
})

Describe("Download", func() {
It("cleans the key", func() {
reader, err := bs.Download("../" + key)
reader, err := bs.Download(blobNode)
Expect(err).ToNot(HaveOccurred())

readData, err := ioutil.ReadAll(reader)
Expand All @@ -92,7 +97,7 @@ var _ = Describe("Blobstore", func() {
})

It("returns a reader to the blob", func() {
reader, err := bs.Download(key)
reader, err := bs.Download(blobNode)
Expect(err).ToNot(HaveOccurred())

readData, err := ioutil.ReadAll(reader)
Expand All @@ -106,7 +111,7 @@ var _ = Describe("Blobstore", func() {
_, err := os.Stat(blobPath)
Expect(err).ToNot(HaveOccurred())

err = bs.Delete(key)
err = bs.Delete(blobNode)
Expect(err).ToNot(HaveOccurred())

_, err = os.Stat(blobPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/fs/ocis/ocis.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func New(m map[string]interface{}) (storage.FS, error) {
return nil, err
}

bs, err := blobstore.New(path.Join(o.Root, "blobs"))
bs, err := blobstore.New(path.Join(o.Root))
if err != nil {
return nil, err
}
Expand Down
35 changes: 25 additions & 10 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"io"
"net/url"
"os"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
Expand Down Expand Up @@ -60,38 +63,50 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(key string, reader io.Reader) error {
func (bs *Blobstore) Upload(node *node.Node, reader io.Reader) error {
size := int64(-1)
if file, ok := reader.(*os.File); ok {
info, err := file.Stat()
if err != nil {
return errors.Wrapf(err, "could not determine file size for object '%s'", key)
return errors.Wrapf(err, "could not determine file size for object '%s'", bs.path(node))
}
size = info.Size()
}

_, err := bs.client.PutObject(context.Background(), bs.bucket, key, reader, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
_, err := bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})

if err != nil {
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", key, bs.bucket)
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket)
}
return nil
}

// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(key string) (io.ReadCloser, error) {
reader, err := bs.client.GetObject(context.Background(), bs.bucket, key, minio.GetObjectOptions{})
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.path(node), minio.GetObjectOptions{})
if err != nil {
return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", key, bs.bucket)
return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.path(node), bs.bucket)
}
return reader, nil
}

// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(key string) error {
err := bs.client.RemoveObject(context.Background(), bs.bucket, key, minio.RemoveObjectOptions{})
func (bs *Blobstore) Delete(node *node.Node) error {
err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.path(node), minio.RemoveObjectOptions{})
if err != nil {
return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", key, bs.bucket)
return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.path(node), bs.bucket)
}
return nil
}

func (bs *Blobstore) path(node *node.Node) string {
// https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/
// Prefixes are used to partion a bucket. A prefix is everything except the filename.
// For a file `BucketName/foo/bar/lorem.ipsum`, `BucketName/foo/bar/` is the prefix.
// There are request limits per prefix, therefore you should have many prefixes.
// There are no limits to prefixes per bucket, so in general it's better to have more then less.
//
// Since the spaceID is always the same for a space, we don't need to pathify that, because it would
// not yield any performance gains
return filepath.Clean(filepath.Join(node.SpaceID, lookup.Pathify(node.BlobID, 4, 2)))
}
8 changes: 4 additions & 4 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ type Tree interface {
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error)

WriteBlob(key string, reader io.Reader) error
ReadBlob(key string) (io.ReadCloser, error)
DeleteBlob(key string) error
WriteBlob(node *node.Node, reader io.Reader) error
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error

Propagate(ctx context.Context, node *node.Node) (err error)
}
Expand Down Expand Up @@ -568,7 +568,7 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (
return nil, errtypes.PermissionDenied(filepath.Join(node.ParentID, node.Name))
}

reader, err := fs.tp.ReadBlob(node.BlobID)
reader, err := fs.tp.ReadBlob(node)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+node.ID+"'")
}
Expand Down
Loading

0 comments on commit c437e57

Please sign in to comment.