From 6c12ece9fbd4d705d945f9f3a3122e2a2b97e760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 5 Apr 2024 17:30:37 +0200 Subject: [PATCH 1/6] write blob based on session id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/fs/ocis/blobstore/blobstore.go | 31 ++++--- .../fs/ocis/blobstore/blobstore_test.go | 18 ++--- pkg/storage/fs/posix/blobstore/blobstore.go | 32 +++++--- .../fs/posix/blobstore/blobstore_test.go | 18 ++--- pkg/storage/fs/posix/tree/mocks/Blobstore.go | 80 ++++++++++--------- pkg/storage/fs/posix/tree/tree.go | 32 +++----- pkg/storage/fs/s3ng/blobstore/blobstore.go | 48 +++++++---- .../utils/decomposedfs/decomposedfs.go | 2 +- .../utils/decomposedfs/node/mocks/Tree.go | 79 +++++++++--------- pkg/storage/utils/decomposedfs/node/node.go | 6 +- .../utils/decomposedfs/recycle_test.go | 8 +- pkg/storage/utils/decomposedfs/revisions.go | 7 +- .../decomposedfs/tree/mocks/Blobstore.go | 80 ++++++++++--------- pkg/storage/utils/decomposedfs/tree/tree.go | 32 +++----- .../utils/decomposedfs/tree/tree_test.go | 8 +- .../utils/decomposedfs/upload/store.go | 10 ++- .../utils/decomposedfs/upload/upload.go | 12 +-- .../utils/decomposedfs/upload_async_test.go | 4 +- pkg/storage/utils/decomposedfs/upload_test.go | 10 +-- 19 files changed, 275 insertions(+), 242 deletions(-) diff --git a/pkg/storage/fs/ocis/blobstore/blobstore.go b/pkg/storage/fs/ocis/blobstore/blobstore.go index 7afd3eeb55..b04145b8ee 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore.go @@ -26,7 +26,6 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -49,8 +48,8 @@ func New(root string) (*Blobstore, error) { } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(node *node.Node, source string) error { - dest, err := bs.path(node) +func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -85,8 +84,8 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - dest, err := bs.path(node) +func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { + dest, err := bs.path(spaceID, blobID) if err != nil { return nil, err } @@ -94,12 +93,19 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { if err != nil { return nil, errors.Wrapf(err, "could not read blob '%s'", dest) } + fi, err := file.Stat() + if err != nil { + return nil, errors.Wrapf(err, "could not stat blob '%s'", dest) + } + if fi.Size() != blobSize { + return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", blobSize, fi.Size()) + } return file, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(node *node.Node) error { - dest, err := bs.path(node) +func (bs *Blobstore) Delete(spaceID, blobID string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -109,14 +115,17 @@ func (bs *Blobstore) Delete(node *node.Node) error { return nil } -func (bs *Blobstore) path(node *node.Node) (string, error) { - if node.BlobID == "" { - return "", fmt.Errorf("blobstore: BlobID is empty") +func (bs *Blobstore) path(spaceID, blobID string) (string, error) { + if spaceID == "" { + return "", fmt.Errorf("blobstore: spaceID is empty") + } + if blobID == "" { + return "", fmt.Errorf("blobstore: blobID is empty") } return filepath.Join( bs.root, filepath.Clean(filepath.Join( - "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), + "/", "spaces", lookup.Pathify(spaceID, 1, 2), "blobs", lookup.Pathify(blobID, 4, 2)), ), ), nil } diff --git a/pkg/storage/fs/ocis/blobstore/blobstore_test.go b/pkg/storage/fs/ocis/blobstore/blobstore_test.go index e895497abc..114148f6de 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore_test.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore_test.go @@ -24,7 +24,6 @@ import ( "path" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/tests/helpers" . "github.com/onsi/ginkgo/v2" @@ -34,7 +33,8 @@ import ( var _ = Describe("Blobstore", func() { var ( tmpRoot string - blobNode *node.Node + spaceID string + blobID string blobPath string blobSrcFile string data []byte @@ -48,10 +48,8 @@ var _ = Describe("Blobstore", func() { Expect(err).ToNot(HaveOccurred()) data = []byte("1234567890") - blobNode = &node.Node{ - SpaceID: "wonderfullspace", - BlobID: "huuuuugeblob", - } + spaceID = "wonderfullspace" + blobID = "huuuuugeblob" blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob") blobSrcFile = path.Join(tmpRoot, "blobsrc") @@ -77,7 +75,7 @@ var _ = Describe("Blobstore", func() { Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed()) }) It("writes the blob", func() { - err := bs.Upload(blobNode, blobSrcFile) + err := bs.Upload(spaceID, blobID, int64(len(data)), blobSrcFile) Expect(err).ToNot(HaveOccurred()) writtenBytes, err := os.ReadFile(blobPath) @@ -95,7 +93,7 @@ var _ = Describe("Blobstore", func() { Describe("Download", func() { It("cleans the key", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID, int64(len(data))) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -104,7 +102,7 @@ var _ = Describe("Blobstore", func() { }) It("returns a reader to the blob", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID, int64(len(data))) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -118,7 +116,7 @@ var _ = Describe("Blobstore", func() { _, err := os.Stat(blobPath) Expect(err).ToNot(HaveOccurred()) - err = bs.Delete(blobNode) + err = bs.Delete(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) _, err = os.Stat(blobPath) diff --git a/pkg/storage/fs/posix/blobstore/blobstore.go b/pkg/storage/fs/posix/blobstore/blobstore.go index 7afd3eeb55..d61a05acad 100644 --- a/pkg/storage/fs/posix/blobstore/blobstore.go +++ b/pkg/storage/fs/posix/blobstore/blobstore.go @@ -26,7 +26,6 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -49,8 +48,8 @@ func New(root string) (*Blobstore, error) { } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(node *node.Node, source string) error { - dest, err := bs.path(node) +func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -85,8 +84,8 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - dest, err := bs.path(node) +func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { + dest, err := bs.path(spaceID, blobID) if err != nil { return nil, err } @@ -94,12 +93,20 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { if err != nil { return nil, errors.Wrapf(err, "could not read blob '%s'", dest) } + fi, err := file.Stat() + if err != nil { + return nil, errors.Wrapf(err, "could not stat blob '%s'", dest) + } + if fi.Size() != blobSize { + return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", blobSize, fi.Size()) + } + return file, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(node *node.Node) error { - dest, err := bs.path(node) +func (bs *Blobstore) Delete(spaceID, blobID string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { return err } @@ -109,14 +116,17 @@ func (bs *Blobstore) Delete(node *node.Node) error { return nil } -func (bs *Blobstore) path(node *node.Node) (string, error) { - if node.BlobID == "" { - return "", fmt.Errorf("blobstore: BlobID is empty") +func (bs *Blobstore) path(spaceID, blobID string) (string, error) { + if spaceID == "" { + return "", fmt.Errorf("blobstore: spaceID is empty") + } + if blobID == "" { + return "", fmt.Errorf("blobstore: blobID is empty") } return filepath.Join( bs.root, filepath.Clean(filepath.Join( - "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), + "/", "spaces", lookup.Pathify(spaceID, 1, 2), "blobs", lookup.Pathify(blobID, 4, 2)), ), ), nil } diff --git a/pkg/storage/fs/posix/blobstore/blobstore_test.go b/pkg/storage/fs/posix/blobstore/blobstore_test.go index e895497abc..114148f6de 100644 --- a/pkg/storage/fs/posix/blobstore/blobstore_test.go +++ b/pkg/storage/fs/posix/blobstore/blobstore_test.go @@ -24,7 +24,6 @@ import ( "path" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/tests/helpers" . "github.com/onsi/ginkgo/v2" @@ -34,7 +33,8 @@ import ( var _ = Describe("Blobstore", func() { var ( tmpRoot string - blobNode *node.Node + spaceID string + blobID string blobPath string blobSrcFile string data []byte @@ -48,10 +48,8 @@ var _ = Describe("Blobstore", func() { Expect(err).ToNot(HaveOccurred()) data = []byte("1234567890") - blobNode = &node.Node{ - SpaceID: "wonderfullspace", - BlobID: "huuuuugeblob", - } + spaceID = "wonderfullspace" + blobID = "huuuuugeblob" blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob") blobSrcFile = path.Join(tmpRoot, "blobsrc") @@ -77,7 +75,7 @@ var _ = Describe("Blobstore", func() { Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed()) }) It("writes the blob", func() { - err := bs.Upload(blobNode, blobSrcFile) + err := bs.Upload(spaceID, blobID, int64(len(data)), blobSrcFile) Expect(err).ToNot(HaveOccurred()) writtenBytes, err := os.ReadFile(blobPath) @@ -95,7 +93,7 @@ var _ = Describe("Blobstore", func() { Describe("Download", func() { It("cleans the key", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID, int64(len(data))) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -104,7 +102,7 @@ var _ = Describe("Blobstore", func() { }) It("returns a reader to the blob", func() { - reader, err := bs.Download(blobNode) + reader, err := bs.Download(spaceID, blobID, int64(len(data))) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -118,7 +116,7 @@ var _ = Describe("Blobstore", func() { _, err := os.Stat(blobPath) Expect(err).ToNot(HaveOccurred()) - err = bs.Delete(blobNode) + err = bs.Delete(spaceID, blobID) Expect(err).ToNot(HaveOccurred()) _, err = os.Stat(blobPath) diff --git a/pkg/storage/fs/posix/tree/mocks/Blobstore.go b/pkg/storage/fs/posix/tree/mocks/Blobstore.go index 3f23a832e1..01640bbc20 100644 --- a/pkg/storage/fs/posix/tree/mocks/Blobstore.go +++ b/pkg/storage/fs/posix/tree/mocks/Blobstore.go @@ -23,7 +23,6 @@ package mocks import ( io "io" - node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" mock "github.com/stretchr/testify/mock" ) @@ -40,17 +39,17 @@ func (_m *Blobstore) EXPECT() *Blobstore_Expecter { return &Blobstore_Expecter{mock: &_m.Mock} } -// Delete provides a mock function with given fields: _a0 -func (_m *Blobstore) Delete(_a0 *node.Node) error { - ret := _m.Called(_a0) +// Delete provides a mock function with given fields: spaceID, blobID +func (_m *Blobstore) Delete(spaceID string, blobID string) error { + ret := _m.Called(spaceID, blobID) if len(ret) == 0 { panic("no return value specified for Delete") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(spaceID, blobID) } else { r0 = ret.Error(0) } @@ -64,14 +63,15 @@ type Blobstore_Delete_Call struct { } // Delete is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Delete(_a0 interface{}) *Blobstore_Delete_Call { - return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", _a0)} +// - spaceID string +// - blobID string +func (_e *Blobstore_Expecter) Delete(spaceID interface{}, blobID interface{}) *Blobstore_Delete_Call { + return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", spaceID, blobID)} } -func (_c *Blobstore_Delete_Call) Run(run func(_a0 *node.Node)) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -81,14 +81,14 @@ func (_c *Blobstore_Delete_Call) Return(_a0 error) *Blobstore_Delete_Call { return _c } -func (_c *Blobstore_Delete_Call) RunAndReturn(run func(*node.Node) error) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) RunAndReturn(run func(string, string) error) *Blobstore_Delete_Call { _c.Call.Return(run) return _c } -// Download provides a mock function with given fields: _a0 -func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { - ret := _m.Called(_a0) +// Download provides a mock function with given fields: spaceID, blobID, blobSize +func (_m *Blobstore) Download(spaceID string, blobID string, blobSize int64) (io.ReadCloser, error) { + ret := _m.Called(spaceID, blobID, blobSize) if len(ret) == 0 { panic("no return value specified for Download") @@ -96,19 +96,19 @@ func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { - return rf(_a0) + if rf, ok := ret.Get(0).(func(string, string, int64) (io.ReadCloser, error)); ok { + return rf(spaceID, blobID, blobSize) } - if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string, int64) io.ReadCloser); ok { + r0 = rf(spaceID, blobID, blobSize) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(*node.Node) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { + r1 = rf(spaceID, blobID, blobSize) } else { r1 = ret.Error(1) } @@ -122,14 +122,16 @@ type Blobstore_Download_Call struct { } // Download is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Download(_a0 interface{}) *Blobstore_Download_Call { - return &Blobstore_Download_Call{Call: _e.mock.On("Download", _a0)} +// - spaceID string +// - blobID string +// - blobSize int64 +func (_e *Blobstore_Expecter) Download(spaceID interface{}, blobID interface{}, blobSize interface{}) *Blobstore_Download_Call { + return &Blobstore_Download_Call{Call: _e.mock.On("Download", spaceID, blobID, blobSize)} } -func (_c *Blobstore_Download_Call) Run(run func(_a0 *node.Node)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) Run(run func(spaceID string, blobID string, blobSize int64)) *Blobstore_Download_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string), args[2].(int64)) }) return _c } @@ -139,22 +141,22 @@ func (_c *Blobstore_Download_Call) Return(_a0 io.ReadCloser, _a1 error) *Blobsto return _c } -func (_c *Blobstore_Download_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) RunAndReturn(run func(string, string, int64) (io.ReadCloser, error)) *Blobstore_Download_Call { _c.Call.Return(run) return _c } -// Upload provides a mock function with given fields: _a0, source -func (_m *Blobstore) Upload(_a0 *node.Node, source string) error { - ret := _m.Called(_a0, source) +// Upload provides a mock function with given fields: spaceID, blobID, blobSize, source +func (_m *Blobstore) Upload(spaceID string, blobID string, blobSize int64, source string) error { + ret := _m.Called(spaceID, blobID, blobSize, source) if len(ret) == 0 { panic("no return value specified for Upload") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { - r0 = rf(_a0, source) + if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { + r0 = rf(spaceID, blobID, blobSize, source) } else { r0 = ret.Error(0) } @@ -168,15 +170,17 @@ type Blobstore_Upload_Call struct { } // Upload is a helper method to define mock.On call -// - _a0 *node.Node +// - spaceID string +// - blobID string +// - blobSize int64 // - source string -func (_e *Blobstore_Expecter) Upload(_a0 interface{}, source interface{}) *Blobstore_Upload_Call { - return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", _a0, source)} +func (_e *Blobstore_Expecter) Upload(spaceID interface{}, blobID interface{}, blobSize interface{}, source interface{}) *Blobstore_Upload_Call { + return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", spaceID, blobID, blobSize, source)} } -func (_c *Blobstore_Upload_Call) Run(run func(_a0 *node.Node, source string)) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) Run(run func(spaceID string, blobID string, blobSize int64, source string)) *Blobstore_Upload_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node), args[1].(string)) + run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) }) return _c } @@ -186,7 +190,7 @@ func (_c *Blobstore_Upload_Call) Return(_a0 error) *Blobstore_Upload_Call { return _c } -func (_c *Blobstore_Upload_Call) RunAndReturn(run func(*node.Node, string) error) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) RunAndReturn(run func(string, string, int64, string) error) *Blobstore_Upload_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/fs/posix/tree/tree.go b/pkg/storage/fs/posix/tree/tree.go index b157133cac..ad4ed80ffc 100644 --- a/pkg/storage/fs/posix/tree/tree.go +++ b/pkg/storage/fs/posix/tree/tree.go @@ -21,7 +21,6 @@ package tree import ( "bytes" "context" - "fmt" "io" "io/fs" "os" @@ -56,9 +55,9 @@ func init() { // Blobstore defines an interface for storing blobs in a blobstore type Blobstore interface { - Upload(node *node.Node, source string) error - Download(node *node.Node) (io.ReadCloser, error) - Delete(node *node.Node) error + Upload(spaceID, blobID string, blobSize int64, source string) error + Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) + Delete(spaceID, blobID string) error } // Tree manages a hierarchical tree @@ -616,7 +615,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n); err != nil { + if err := t.DeleteBlob(n.SpaceID, n.BlobID); err != nil { log.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -645,7 +644,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error } if bID != "" { - if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { + if err := t.DeleteBlob(n.SpaceID, bID); err != nil { log.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -662,29 +661,22 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(node *node.Node, source string) error { - return t.blobstore.Upload(node, source) +func (t *Tree) WriteBlob(spaceID, blobID string, blobSize int64, source string) error { + return t.blobstore.Upload(spaceID, blobID, blobSize, source) } // ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { - if node.BlobID == "" { +func (t *Tree) ReadBlob(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { + if blobID == "" { // there is no blob yet - we are dealing with a 0 byte file return io.NopCloser(bytes.NewReader([]byte{})), nil } - return t.blobstore.Download(node) + return t.blobstore.Download(spaceID, blobID, blobSize) } // DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(node *node.Node) error { - if node == nil { - return fmt.Errorf("could not delete blob, nil node was given") - } - if node.BlobID == "" { - return fmt.Errorf("could not delete blob, node with empty blob id was given") - } - - return t.blobstore.Delete(node) +func (t *Tree) DeleteBlob(spaceID, blobID string) error { + return t.blobstore.Delete(spaceID, blobID) } // TODO check if node exists? diff --git a/pkg/storage/fs/s3ng/blobstore/blobstore.go b/pkg/storage/fs/s3ng/blobstore/blobstore.go index 48a40b764f..3150ed98c1 100644 --- a/pkg/storage/fs/s3ng/blobstore/blobstore.go +++ b/pkg/storage/fs/s3ng/blobstore/blobstore.go @@ -27,7 +27,6 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" @@ -76,14 +75,19 @@ func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOption } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(node *node.Node, source string) error { +func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { + dest, err := bs.path(spaceID, blobID) + if err != nil { + return err + } + reader, err := os.Open(source) if err != nil { return errors.Wrap(err, "can not open source file to upload") } defer reader.Close() - _, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ + _, err = bs.client.PutObject(context.Background(), bs.bucket, dest, reader, blobSize, minio.PutObjectOptions{ ContentType: "application/octet-stream", SendContentMd5: bs.defaultPutOptions.SendContentMd5, ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts, @@ -94,40 +98,54 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error { }) if err != nil { - return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket) + return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", dest, bs.bucket) } return nil } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { - reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.path(node), minio.GetObjectOptions{}) +func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { + dest, err := bs.path(spaceID, blobID) + if err != nil { + return nil, err + } + reader, err := bs.client.GetObject(context.Background(), bs.bucket, dest, minio.GetObjectOptions{}) if err != nil { - return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.path(node), bs.bucket) + return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", dest, bs.bucket) } stat, err := reader.Stat() if err != nil { - return nil, errors.Wrapf(err, "blob path: %s", bs.path(node)) + return nil, errors.Wrapf(err, "blob path: %s", dest) } - if node.Blobsize != stat.Size { - return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", node.Blobsize, stat.Size) + if blobSize != stat.Size { + return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", blobSize, stat.Size) } return reader, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(node *node.Node) error { - err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.path(node), minio.RemoveObjectOptions{}) +func (bs *Blobstore) Delete(spaceID, blobID string) error { + dest, err := bs.path(spaceID, blobID) if err != nil { - return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.path(node), bs.bucket) + return err + } + err = bs.client.RemoveObject(context.Background(), bs.bucket, dest, minio.RemoveObjectOptions{}) + if err != nil { + return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", dest, bs.bucket) } return nil } -func (bs *Blobstore) path(node *node.Node) string { +func (bs *Blobstore) path(spaceID, blobID string) (string, error) { + if spaceID == "" { + return "", fmt.Errorf("blobstore: spaceID is empty") + } + if blobID == "" { + return "", fmt.Errorf("blobstore: blobID is empty") + } // https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/ // Prefixes are used to partion a bucket. A prefix is everything except the filename. // For a file `BucketName/foo/bar/lorem.ipsum`, `BucketName/foo/bar/` is the prefix. @@ -136,5 +154,5 @@ func (bs *Blobstore) path(node *node.Node) string { // // Since the spaceID is always the same for a space, we don't need to pathify that, because it would // not yield any performance gains - return filepath.Clean(filepath.Join(node.SpaceID, lookup.Pathify(node.BlobID, 4, 2))) + return filepath.Clean(filepath.Join(spaceID, lookup.Pathify(blobID, 4, 2))), nil } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index a9754c0743..ac7fdfe5eb 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -1045,7 +1045,7 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) ( if currentEtag != expectedEtag { return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag)) } - reader, err := fs.tp.ReadBlob(n) + reader, err := fs.tp.ReadBlob(n.SpaceID, n.BlobID, n.Blobsize) if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'") } diff --git a/pkg/storage/utils/decomposedfs/node/mocks/Tree.go b/pkg/storage/utils/decomposedfs/node/mocks/Tree.go index d6a07c8137..410cc1cd82 100644 --- a/pkg/storage/utils/decomposedfs/node/mocks/Tree.go +++ b/pkg/storage/utils/decomposedfs/node/mocks/Tree.go @@ -138,17 +138,17 @@ func (_c *Tree_Delete_Call) RunAndReturn(run func(context.Context, *node.Node) e return _c } -// DeleteBlob provides a mock function with given fields: _a0 -func (_m *Tree) DeleteBlob(_a0 *node.Node) error { - ret := _m.Called(_a0) +// DeleteBlob provides a mock function with given fields: spaceID, blobId +func (_m *Tree) DeleteBlob(spaceID string, blobId string) error { + ret := _m.Called(spaceID, blobId) if len(ret) == 0 { panic("no return value specified for DeleteBlob") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(spaceID, blobId) } else { r0 = ret.Error(0) } @@ -162,14 +162,15 @@ type Tree_DeleteBlob_Call struct { } // DeleteBlob is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Tree_Expecter) DeleteBlob(_a0 interface{}) *Tree_DeleteBlob_Call { - return &Tree_DeleteBlob_Call{Call: _e.mock.On("DeleteBlob", _a0)} +// - spaceID string +// - blobId string +func (_e *Tree_Expecter) DeleteBlob(spaceID interface{}, blobId interface{}) *Tree_DeleteBlob_Call { + return &Tree_DeleteBlob_Call{Call: _e.mock.On("DeleteBlob", spaceID, blobId)} } -func (_c *Tree_DeleteBlob_Call) Run(run func(_a0 *node.Node)) *Tree_DeleteBlob_Call { +func (_c *Tree_DeleteBlob_Call) Run(run func(spaceID string, blobId string)) *Tree_DeleteBlob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -179,7 +180,7 @@ func (_c *Tree_DeleteBlob_Call) Return(_a0 error) *Tree_DeleteBlob_Call { return _c } -func (_c *Tree_DeleteBlob_Call) RunAndReturn(run func(*node.Node) error) *Tree_DeleteBlob_Call { +func (_c *Tree_DeleteBlob_Call) RunAndReturn(run func(string, string) error) *Tree_DeleteBlob_Call { _c.Call.Return(run) return _c } @@ -468,9 +469,9 @@ func (_c *Tree_PurgeRecycleItemFunc_Call) RunAndReturn(run func(context.Context, return _c } -// ReadBlob provides a mock function with given fields: _a0 -func (_m *Tree) ReadBlob(_a0 *node.Node) (io.ReadCloser, error) { - ret := _m.Called(_a0) +// ReadBlob provides a mock function with given fields: spaceID, blobId, blobSize +func (_m *Tree) ReadBlob(spaceID string, blobId string, blobSize int64) (io.ReadCloser, error) { + ret := _m.Called(spaceID, blobId, blobSize) if len(ret) == 0 { panic("no return value specified for ReadBlob") @@ -478,19 +479,19 @@ func (_m *Tree) ReadBlob(_a0 *node.Node) (io.ReadCloser, error) { var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { - return rf(_a0) + if rf, ok := ret.Get(0).(func(string, string, int64) (io.ReadCloser, error)); ok { + return rf(spaceID, blobId, blobSize) } - if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string, int64) io.ReadCloser); ok { + r0 = rf(spaceID, blobId, blobSize) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(*node.Node) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { + r1 = rf(spaceID, blobId, blobSize) } else { r1 = ret.Error(1) } @@ -504,14 +505,16 @@ type Tree_ReadBlob_Call struct { } // ReadBlob is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Tree_Expecter) ReadBlob(_a0 interface{}) *Tree_ReadBlob_Call { - return &Tree_ReadBlob_Call{Call: _e.mock.On("ReadBlob", _a0)} +// - spaceID string +// - blobId string +// - blobSize int64 +func (_e *Tree_Expecter) ReadBlob(spaceID interface{}, blobId interface{}, blobSize interface{}) *Tree_ReadBlob_Call { + return &Tree_ReadBlob_Call{Call: _e.mock.On("ReadBlob", spaceID, blobId, blobSize)} } -func (_c *Tree_ReadBlob_Call) Run(run func(_a0 *node.Node)) *Tree_ReadBlob_Call { +func (_c *Tree_ReadBlob_Call) Run(run func(spaceID string, blobId string, blobSize int64)) *Tree_ReadBlob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string), args[2].(int64)) }) return _c } @@ -521,7 +524,7 @@ func (_c *Tree_ReadBlob_Call) Return(_a0 io.ReadCloser, _a1 error) *Tree_ReadBlo return _c } -func (_c *Tree_ReadBlob_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Tree_ReadBlob_Call { +func (_c *Tree_ReadBlob_Call) RunAndReturn(run func(string, string, int64) (io.ReadCloser, error)) *Tree_ReadBlob_Call { _c.Call.Return(run) return _c } @@ -700,17 +703,17 @@ func (_c *Tree_TouchFile_Call) RunAndReturn(run func(context.Context, *node.Node return _c } -// WriteBlob provides a mock function with given fields: _a0, source -func (_m *Tree) WriteBlob(_a0 *node.Node, source string) error { - ret := _m.Called(_a0, source) +// WriteBlob provides a mock function with given fields: spaceID, blobId, blobSize, source +func (_m *Tree) WriteBlob(spaceID string, blobId string, blobSize int64, source string) error { + ret := _m.Called(spaceID, blobId, blobSize, source) if len(ret) == 0 { panic("no return value specified for WriteBlob") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { - r0 = rf(_a0, source) + if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { + r0 = rf(spaceID, blobId, blobSize, source) } else { r0 = ret.Error(0) } @@ -724,15 +727,17 @@ type Tree_WriteBlob_Call struct { } // WriteBlob is a helper method to define mock.On call -// - _a0 *node.Node +// - spaceID string +// - blobId string +// - blobSize int64 // - source string -func (_e *Tree_Expecter) WriteBlob(_a0 interface{}, source interface{}) *Tree_WriteBlob_Call { - return &Tree_WriteBlob_Call{Call: _e.mock.On("WriteBlob", _a0, source)} +func (_e *Tree_Expecter) WriteBlob(spaceID interface{}, blobId interface{}, blobSize interface{}, source interface{}) *Tree_WriteBlob_Call { + return &Tree_WriteBlob_Call{Call: _e.mock.On("WriteBlob", spaceID, blobId, blobSize, source)} } -func (_c *Tree_WriteBlob_Call) Run(run func(_a0 *node.Node, source string)) *Tree_WriteBlob_Call { +func (_c *Tree_WriteBlob_Call) Run(run func(spaceID string, blobId string, blobSize int64, source string)) *Tree_WriteBlob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node), args[1].(string)) + run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) }) return _c } @@ -742,7 +747,7 @@ func (_c *Tree_WriteBlob_Call) Return(_a0 error) *Tree_WriteBlob_Call { return _c } -func (_c *Tree_WriteBlob_Call) RunAndReturn(run func(*node.Node, string) error) *Tree_WriteBlob_Call { +func (_c *Tree_WriteBlob_Call) RunAndReturn(run func(string, string, int64, string) error) *Tree_WriteBlob_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index f1db7f5eb8..5907df490f 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -99,9 +99,9 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error) - WriteBlob(node *Node, source string) error - ReadBlob(node *Node) (io.ReadCloser, error) - DeleteBlob(node *Node) error + WriteBlob(spaceID, blobId string, blobSize int64, source string) error + ReadBlob(spaceID, blobId string, blobSize int64) (io.ReadCloser, error) + DeleteBlob(spaceID, blobId string) error Propagate(ctx context.Context, node *Node, sizeDiff int64) (err error) } diff --git a/pkg/storage/utils/decomposedfs/recycle_test.go b/pkg/storage/utils/decomposedfs/recycle_test.go index 1af163d093..7dd0f365d9 100644 --- a/pkg/storage/utils/decomposedfs/recycle_test.go +++ b/pkg/storage/utils/decomposedfs/recycle_test.go @@ -86,7 +86,7 @@ var _ = Describe("Recycle", func() { It("they can be permanently deleted by this user", func() { // mock call to blobstore - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -104,7 +104,7 @@ var _ = Describe("Recycle", func() { }) It("they can be restored", func() { - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -177,7 +177,7 @@ var _ = Describe("Recycle", func() { }) It("they can be permanently deleted by the other user", func() { - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -206,7 +206,7 @@ var _ = Describe("Recycle", func() { }) It("they can be restored by the other user", func() { - env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index c079c3bddb..0775c354cd 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -152,14 +152,13 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey) } + blobsize, err := fs.lu.ReadBlobSizeAttr(ctx, contentPath) if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not read blob size of revision '%s' for node '%s'", n.ID, revisionKey) } - revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore - - reader, err := fs.tp.ReadBlob(&revisionNode) + reader, err := fs.tp.ReadBlob(spaceID, blobid, blobsize) if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) } @@ -320,7 +319,7 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere return err } - return fs.tp.DeleteBlob(n) + return fs.tp.DeleteBlob(n.SpaceID, n.BlobID) } func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { diff --git a/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go b/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go index 3f23a832e1..01640bbc20 100644 --- a/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go +++ b/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go @@ -23,7 +23,6 @@ package mocks import ( io "io" - node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" mock "github.com/stretchr/testify/mock" ) @@ -40,17 +39,17 @@ func (_m *Blobstore) EXPECT() *Blobstore_Expecter { return &Blobstore_Expecter{mock: &_m.Mock} } -// Delete provides a mock function with given fields: _a0 -func (_m *Blobstore) Delete(_a0 *node.Node) error { - ret := _m.Called(_a0) +// Delete provides a mock function with given fields: spaceID, blobID +func (_m *Blobstore) Delete(spaceID string, blobID string) error { + ret := _m.Called(spaceID, blobID) if len(ret) == 0 { panic("no return value specified for Delete") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(spaceID, blobID) } else { r0 = ret.Error(0) } @@ -64,14 +63,15 @@ type Blobstore_Delete_Call struct { } // Delete is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Delete(_a0 interface{}) *Blobstore_Delete_Call { - return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", _a0)} +// - spaceID string +// - blobID string +func (_e *Blobstore_Expecter) Delete(spaceID interface{}, blobID interface{}) *Blobstore_Delete_Call { + return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", spaceID, blobID)} } -func (_c *Blobstore_Delete_Call) Run(run func(_a0 *node.Node)) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string)) }) return _c } @@ -81,14 +81,14 @@ func (_c *Blobstore_Delete_Call) Return(_a0 error) *Blobstore_Delete_Call { return _c } -func (_c *Blobstore_Delete_Call) RunAndReturn(run func(*node.Node) error) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) RunAndReturn(run func(string, string) error) *Blobstore_Delete_Call { _c.Call.Return(run) return _c } -// Download provides a mock function with given fields: _a0 -func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { - ret := _m.Called(_a0) +// Download provides a mock function with given fields: spaceID, blobID, blobSize +func (_m *Blobstore) Download(spaceID string, blobID string, blobSize int64) (io.ReadCloser, error) { + ret := _m.Called(spaceID, blobID, blobSize) if len(ret) == 0 { panic("no return value specified for Download") @@ -96,19 +96,19 @@ func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { - return rf(_a0) + if rf, ok := ret.Get(0).(func(string, string, int64) (io.ReadCloser, error)); ok { + return rf(spaceID, blobID, blobSize) } - if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, string, int64) io.ReadCloser); ok { + r0 = rf(spaceID, blobID, blobSize) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(*node.Node) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { + r1 = rf(spaceID, blobID, blobSize) } else { r1 = ret.Error(1) } @@ -122,14 +122,16 @@ type Blobstore_Download_Call struct { } // Download is a helper method to define mock.On call -// - _a0 *node.Node -func (_e *Blobstore_Expecter) Download(_a0 interface{}) *Blobstore_Download_Call { - return &Blobstore_Download_Call{Call: _e.mock.On("Download", _a0)} +// - spaceID string +// - blobID string +// - blobSize int64 +func (_e *Blobstore_Expecter) Download(spaceID interface{}, blobID interface{}, blobSize interface{}) *Blobstore_Download_Call { + return &Blobstore_Download_Call{Call: _e.mock.On("Download", spaceID, blobID, blobSize)} } -func (_c *Blobstore_Download_Call) Run(run func(_a0 *node.Node)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) Run(run func(spaceID string, blobID string, blobSize int64)) *Blobstore_Download_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node)) + run(args[0].(string), args[1].(string), args[2].(int64)) }) return _c } @@ -139,22 +141,22 @@ func (_c *Blobstore_Download_Call) Return(_a0 io.ReadCloser, _a1 error) *Blobsto return _c } -func (_c *Blobstore_Download_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) RunAndReturn(run func(string, string, int64) (io.ReadCloser, error)) *Blobstore_Download_Call { _c.Call.Return(run) return _c } -// Upload provides a mock function with given fields: _a0, source -func (_m *Blobstore) Upload(_a0 *node.Node, source string) error { - ret := _m.Called(_a0, source) +// Upload provides a mock function with given fields: spaceID, blobID, blobSize, source +func (_m *Blobstore) Upload(spaceID string, blobID string, blobSize int64, source string) error { + ret := _m.Called(spaceID, blobID, blobSize, source) if len(ret) == 0 { panic("no return value specified for Upload") } var r0 error - if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { - r0 = rf(_a0, source) + if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { + r0 = rf(spaceID, blobID, blobSize, source) } else { r0 = ret.Error(0) } @@ -168,15 +170,17 @@ type Blobstore_Upload_Call struct { } // Upload is a helper method to define mock.On call -// - _a0 *node.Node +// - spaceID string +// - blobID string +// - blobSize int64 // - source string -func (_e *Blobstore_Expecter) Upload(_a0 interface{}, source interface{}) *Blobstore_Upload_Call { - return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", _a0, source)} +func (_e *Blobstore_Expecter) Upload(spaceID interface{}, blobID interface{}, blobSize interface{}, source interface{}) *Blobstore_Upload_Call { + return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", spaceID, blobID, blobSize, source)} } -func (_c *Blobstore_Upload_Call) Run(run func(_a0 *node.Node, source string)) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) Run(run func(spaceID string, blobID string, blobSize int64, source string)) *Blobstore_Upload_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*node.Node), args[1].(string)) + run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) }) return _c } @@ -186,7 +190,7 @@ func (_c *Blobstore_Upload_Call) Return(_a0 error) *Blobstore_Upload_Call { return _c } -func (_c *Blobstore_Upload_Call) RunAndReturn(run func(*node.Node, string) error) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) RunAndReturn(run func(string, string, int64, string) error) *Blobstore_Upload_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 3d8a057d8b..571fd07943 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -21,7 +21,6 @@ package tree import ( "bytes" "context" - "fmt" "io" "io/fs" "os" @@ -55,9 +54,9 @@ func init() { // Blobstore defines an interface for storing blobs in a blobstore type Blobstore interface { - Upload(node *node.Node, source string) error - Download(node *node.Node) (io.ReadCloser, error) - Delete(node *node.Node) error + Upload(spaceID, blobID string, blobSize int64, source string) error + Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) + Delete(spaceID, blobID string) error } // Tree manages a hierarchical tree @@ -695,7 +694,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node. // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n); err != nil { + if err := t.DeleteBlob(n.SpaceID, n.BlobID); err != nil { logger.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -724,7 +723,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node. } if bID != "" { - if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { + if err := t.DeleteBlob(n.SpaceID, bID); err != nil { logger.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -741,29 +740,22 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(node *node.Node, source string) error { - return t.blobstore.Upload(node, source) +func (t *Tree) WriteBlob(spaceID, blobID string, blobSize int64, source string) error { + return t.blobstore.Upload(spaceID, blobID, blobSize, source) } // ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { - if node.BlobID == "" { +func (t *Tree) ReadBlob(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { + if blobID == "" { // there is no blob yet - we are dealing with a 0 byte file return io.NopCloser(bytes.NewReader([]byte{})), nil } - return t.blobstore.Download(node) + return t.blobstore.Download(spaceID, blobID, blobSize) } // DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(node *node.Node) error { - if node == nil { - return fmt.Errorf("could not delete blob, nil node was given") - } - if node.BlobID == "" { - return fmt.Errorf("could not delete blob, node with empty blob id was given") - } - - return t.blobstore.Delete(node) +func (t *Tree) DeleteBlob(spaceID, blobID string) error { + return t.blobstore.Delete(spaceID, blobID) } // TODO check if node exists? diff --git a/pkg/storage/utils/decomposedfs/tree/tree_test.go b/pkg/storage/utils/decomposedfs/tree/tree_test.go index 74acdf7473..2f9bb39712 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree_test.go +++ b/pkg/storage/utils/decomposedfs/tree/tree_test.go @@ -128,7 +128,7 @@ var _ = Describe("Tree", func() { }) It("does not delete the blob from the blobstore", func() { - env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node")) + env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")) }) }) }) @@ -139,7 +139,7 @@ var _ = Describe("Tree", func() { ) JustBeforeEach(func() { - env.Blobstore.On("Delete", mock.AnythingOfType("*node.Node")).Return(nil) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil) trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) Expect(t.Delete(env.Ctx, n)).To(Succeed()) }) @@ -160,7 +160,7 @@ var _ = Describe("Tree", func() { }) It("deletes the blob from the blobstore", func() { - env.Blobstore.AssertCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node")) + env.Blobstore.AssertCalled(GinkgoT(), "Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")) }) }) @@ -264,7 +264,7 @@ var _ = Describe("Tree", func() { trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) Expect(t.Delete(env.Ctx, n)).To(Succeed()) - env.Blobstore.On("Delete", mock.Anything).Return(nil) + env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil) }) Describe("PurgeRecycleItemFunc", func() { diff --git a/pkg/storage/utils/decomposedfs/upload/store.go b/pkg/storage/utils/decomposedfs/upload/store.go index 57a7aea8f4..aeab235de1 100644 --- a/pkg/storage/utils/decomposedfs/upload/store.go +++ b/pkg/storage/utils/decomposedfs/upload/store.go @@ -205,7 +205,15 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. } var f *lockedfile.File - if session.NodeExists() { + if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node + // we do not need to propagate a change when a node is created, only when the upload is ready. + // that still creates problems for desktop clients because if another change causes propagation it will detects an empty file + // so the first upload has to point to the first revision with the expected size. The file cannot be downloaded, but it can be overwritten (which will create a new revision and make the node reflect the latest revision) + // any finished postprocessing will not affect the node metadata. + // *thinking* but then initializing an upload will lock the file until the upload has finished. That sucks. + // so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists()) + // FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter! + f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size())) if f != nil { appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 9244052a10..6016ec84d2 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -67,9 +67,9 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) - WriteBlob(node *node.Node, binPath string) error - ReadBlob(node *node.Node) (io.ReadCloser, error) - DeleteBlob(node *node.Node) error + WriteBlob(spaceID, blobID string, blobSize int64, binPath string) error + ReadBlob(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) + DeleteBlob(spaceID, blobID string) error Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) } @@ -278,14 +278,10 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo func (session *OcisSession) Finalize() (err error) { ctx, span := tracer.Start(session.Context(context.Background()), "Finalize") defer span.End() - n, err := session.Node(ctx) - if err != nil { - return err - } // upload the data to the blobstore _, subspan := tracer.Start(ctx, "WriteBlob") - err = session.store.tp.WriteBlob(n, session.binPath()) + err = session.store.tp.WriteBlob(session.SpaceID(), session.ID(), session.Size(), session.binPath()) subspan.End() if err != nil { return errors.Wrap(err, "failed to upload file to blobstore") diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index d200bebe5b..fa48cb2af7 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -138,10 +138,10 @@ var _ = Describe("Async file uploads", Ordered, func() { Expect(err).ToNot(HaveOccurred()) ref.ResourceId = &resID - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil). Run(func(args mock.Arguments) { - data, err := os.ReadFile(args.Get(1).(string)) + data, err := os.ReadFile(args.Get(3).(string)) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal(fileContent)) diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 5899b59f96..90e37edb58 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -238,7 +238,7 @@ var _ = Describe("File uploads", func() { When("the user initiates a zero byte file upload", func() { It("succeeds", func() { - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil) uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) @@ -253,7 +253,7 @@ var _ = Describe("File uploads", func() { }) It("fails when trying to upload empty data. 0-byte uploads are finished during initialization already", func() { - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil) uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) @@ -288,10 +288,10 @@ var _ = Describe("File uploads", func() { uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]} - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). + bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil). Run(func(args mock.Arguments) { - data, err := os.ReadFile(args.Get(1).(string)) + data, err := os.ReadFile(args.Get(3).(string)) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal([]byte("0123456789"))) @@ -304,7 +304,7 @@ var _ = Describe("File uploads", func() { }, nil) Expect(err).ToNot(HaveOccurred()) - bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) + bs.AssertCalled(GinkgoT(), "Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{}) From 59aea3537a17f2124e20c632a3b8d3db501525fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 9 Apr 2024 13:44:53 +0200 Subject: [PATCH 2/6] add async size diff test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../utils/decomposedfs/upload_async_test.go | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index fa48cb2af7..dc1ba38a27 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -61,7 +61,8 @@ var _ = Describe("Async file uploads", Ordered, func() { Username: "username", } - fileContent = []byte("0123456789") + fileContent = []byte("0123456789") + file2Content = []byte("01234567890123456789") ctx = ruser.ContextSetUser(context.Background(), user) @@ -141,10 +142,10 @@ var _ = Describe("Async file uploads", Ordered, func() { bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). Return(nil). Run(func(args mock.Arguments) { + size := args.Get(2).(int64) data, err := os.ReadFile(args.Get(3).(string)) - Expect(err).ToNot(HaveOccurred()) - Expect(data).To(Equal(fileContent)) + Expect(len(data)).To(Equal(int(size))) }) // start upload of a file @@ -412,7 +413,7 @@ var _ = Describe("Async file uploads", Ordered, func() { JustBeforeEach(func() { // upload again - uploadIds, err := fs.InitiateUpload(ctx, ref, 10, map[string]string{}) + uploadIds, err := fs.InitiateUpload(ctx, ref, 20, map[string]string{}) Expect(err).ToNot(HaveOccurred()) Expect(len(uploadIds)).To(Equal(2)) Expect(uploadIds["simple"]).ToNot(BeEmpty()) @@ -422,8 +423,8 @@ var _ = Describe("Async file uploads", Ordered, func() { _, err = fs.Upload(ctx, storage.UploadRequest{ Ref: uploadRef, - Body: io.NopCloser(bytes.NewReader(fileContent)), - Length: int64(len(fileContent)), + Body: io.NopCloser(bytes.NewReader(file2Content)), + Length: int64(len(file2Content)), }, nil) Expect(err).ToNot(HaveOccurred()) @@ -456,7 +457,7 @@ var _ = Describe("Async file uploads", Ordered, func() { }) It("removes processing status when second upload is finished, even if first isn't", func() { - // finish postprocessing + // finish postprocessing of second upload con <- events.PostprocessingFinished{ UploadID: secondUploadID, Outcome: events.PPOutcomeContinue, @@ -475,5 +476,49 @@ var _ = Describe("Async file uploads", Ordered, func() { Expect(item.Path).To(Equal(ref.Path)) Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal("")) }) + + FIt("correctly calculates the size when the second upload is finishes, even if first is deleted", func() { + // finish postprocessing of second upload + con <- events.PostprocessingFinished{ + UploadID: secondUploadID, + Outcome: events.PPOutcomeContinue, + } + // wait for upload to be ready + ev, ok := (<-pub).(events.UploadReady) + Expect(ok).To(BeTrue()) + Expect(ev.Failed).To(BeFalse()) + + // check processing status + resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(resources)).To(Equal(1)) + + item := resources[0] + Expect(item.Path).To(Equal(ref.Path)) + Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal("")) + + // size should match the second upload + Expect(item.Size).To(Equal(uint64(len(file2Content)))) + + // finish postprocessing of first upload + con <- events.PostprocessingFinished{ + UploadID: uploadID, + // Outcome: events.PPOutcomeDelete, // This will completely delete the file + Outcome: events.PPOutcomeAbort, // This as well ... fck + } + // wait for upload to be ready + ev, ok = (<-pub).(events.UploadReady) + Expect(ok).To(BeTrue()) + Expect(ev.Failed).To(BeTrue()) + + // check processing status + resources, err = fs.ListFolder(ctx, rootRef, []string{}, []string{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(resources)).To(Equal(1)) + + // size should still match the second upload + Expect(item.Size).To(Equal(uint64(len(file2Content)))) + + }) }) }) From bf7c385bca6c368a4e20d9ef48ba0d330b3284a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 9 Apr 2024 15:38:28 +0200 Subject: [PATCH 3/6] only propagate sizediff if no other upload was started MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../utils/decomposedfs/decomposedfs.go | 13 +++++++++--- pkg/storage/utils/decomposedfs/node/node.go | 6 ++++++ .../utils/decomposedfs/upload/upload.go | 18 +++++++++++----- .../utils/decomposedfs/upload_async_test.go | 21 ++++++++++++++----- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index ac7fdfe5eb..c3e9add631 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -309,9 +309,16 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { now := time.Now() if failed { - // propagate sizeDiff after failed postprocessing - if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + log.Error().Err(err).Str("node", n.ID).Str("uploadID", ev.UploadID).Msg("reading node for session failed") + } + if latestSession == session.ID() { + // propagate reverted sizeDiff after failed postprocessing + if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") + } } } else if p := getParent(); p != nil { // update parent tmtime to propagate etag change after successful postprocessing diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 5907df490f..404c8f4e56 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -1276,6 +1276,12 @@ func (n *Node) IsProcessing(ctx context.Context) bool { return err == nil && strings.HasPrefix(v, ProcessingStatus) } +// ProcessingID returns the latest upload session id +func (n *Node) ProcessingID(ctx context.Context) (string, error) { + v, err := n.XattrString(ctx, prefixes.StatusPrefix) + return strings.TrimPrefix(v, ProcessingStatus), err +} + // IsSpaceRoot checks if the node is a space root func (n *Node) IsSpaceRoot(ctx context.Context) bool { _, err := n.Xattr(ctx, prefixes.SpaceNameAttr) diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 6016ec84d2..98605805f7 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -314,12 +314,12 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool ctx := session.Context(context.Background()) if revertNodeMetadata { + n, err := session.Node(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading node for session failed") + } if session.NodeExists() { p := session.info.MetaData["versionsPath"] - n, err := session.Node(ctx) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed") - } if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) { return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || attributeName == prefixes.TypeAttr || @@ -335,7 +335,15 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool } } else { - session.removeNode(ctx) + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Str("sessionid", session.ID()).Msg("reading processingid for session failed") + } + if latestSession == session.ID() { + // actually delete the node + session.removeNode(ctx) + } } } diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index dc1ba38a27..63d7b950cb 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -85,8 +85,10 @@ var _ = Describe("Async file uploads", Ordered, func() { Expect(err).ToNot(HaveOccurred()) o, err = options.New(map[string]interface{}{ - "root": tmpRoot, - "asyncfileuploads": true, + "root": tmpRoot, + "asyncfileuploads": true, + "treetime_accounting": true, + "treesize_accounting": true, }) Expect(err).ToNot(HaveOccurred()) @@ -477,7 +479,7 @@ var _ = Describe("Async file uploads", Ordered, func() { Expect(utils.ReadPlainFromOpaque(item.Opaque, "status")).To(Equal("")) }) - FIt("correctly calculates the size when the second upload is finishes, even if first is deleted", func() { + It("correctly calculates the size when the second upload is finishes, even if first is deleted", func() { // finish postprocessing of second upload con <- events.PostprocessingFinished{ UploadID: secondUploadID, @@ -500,11 +502,16 @@ var _ = Describe("Async file uploads", Ordered, func() { // size should match the second upload Expect(item.Size).To(Equal(uint64(len(file2Content)))) + // parent size should match second upload as well + parentInfo, err := fs.GetMD(ctx, rootRef, []string{}, []string{}) + Expect(err).ToNot(HaveOccurred()) + Expect(parentInfo.Size).To(Equal(uint64(len(file2Content)))) + // finish postprocessing of first upload con <- events.PostprocessingFinished{ UploadID: uploadID, - // Outcome: events.PPOutcomeDelete, // This will completely delete the file - Outcome: events.PPOutcomeAbort, // This as well ... fck + Outcome: events.PPOutcomeDelete, + // Outcome: events.PPOutcomeAbort, // This as well ... fck } // wait for upload to be ready ev, ok = (<-pub).(events.UploadReady) @@ -519,6 +526,10 @@ var _ = Describe("Async file uploads", Ordered, func() { // size should still match the second upload Expect(item.Size).To(Equal(uint64(len(file2Content)))) + // parent size should still match second upload as well + parentInfo, err = fs.GetMD(ctx, rootRef, []string{}, []string{}) + Expect(err).ToNot(HaveOccurred()) + Expect(parentInfo.Size).To(Equal(uint64(len(file2Content)))) }) }) }) From b592649d82d180ddbf48e5935863390d2a727303 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 9 Apr 2024 15:52:57 +0200 Subject: [PATCH 4/6] add changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/fix-blobstore.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/fix-blobstore.md diff --git a/changelog/unreleased/fix-blobstore.md b/changelog/unreleased/fix-blobstore.md new file mode 100644 index 0000000000..d28a5ca4cc --- /dev/null +++ b/changelog/unreleased/fix-blobstore.md @@ -0,0 +1,5 @@ +Bugfix: write blob based on session id + +Decomposedfs now uses the session id and size when moving an uplode to the blobstore. This fixes a cornercase that prevents an upload session from correctly being finished when another upload session to the file was started and already finished. + +https://github.com/cs3org/reva/pull/4615 From f6f30f2b2c51128463c5857db51b0c93ef6d1c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 10 Apr 2024 11:05:15 +0200 Subject: [PATCH 5/6] add fixme for known bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload/upload.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 98605805f7..3bfc4375da 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -344,6 +344,7 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool // actually delete the node session.removeNode(ctx) } + // FIXME else if the upload has become a revision, delete the revision } } From 753dae0437f2b6eb2c6faa31e8ed084d1facb317 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 10 Apr 2024 12:54:17 +0200 Subject: [PATCH 6/6] revert blobstore signature changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/fs/ocis/blobstore/blobstore.go | 31 +++---- .../fs/ocis/blobstore/blobstore_test.go | 18 +++-- pkg/storage/fs/posix/blobstore/blobstore.go | 32 +++----- .../fs/posix/blobstore/blobstore_test.go | 18 +++-- pkg/storage/fs/posix/tree/mocks/Blobstore.go | 80 +++++++++---------- pkg/storage/fs/posix/tree/tree.go | 32 +++++--- pkg/storage/fs/s3ng/blobstore/blobstore.go | 48 ++++------- .../utils/decomposedfs/decomposedfs.go | 2 +- .../utils/decomposedfs/node/mocks/Tree.go | 79 +++++++++--------- pkg/storage/utils/decomposedfs/node/node.go | 6 +- .../utils/decomposedfs/recycle_test.go | 8 +- pkg/storage/utils/decomposedfs/revisions.go | 7 +- .../decomposedfs/tree/mocks/Blobstore.go | 80 +++++++++---------- pkg/storage/utils/decomposedfs/tree/tree.go | 32 +++++--- .../utils/decomposedfs/tree/tree_test.go | 8 +- .../utils/decomposedfs/upload/upload.go | 12 +-- .../utils/decomposedfs/upload_async_test.go | 9 ++- pkg/storage/utils/decomposedfs/upload_test.go | 10 +-- 18 files changed, 243 insertions(+), 269 deletions(-) diff --git a/pkg/storage/fs/ocis/blobstore/blobstore.go b/pkg/storage/fs/ocis/blobstore/blobstore.go index b04145b8ee..7afd3eeb55 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore.go @@ -26,6 +26,7 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -48,8 +49,8 @@ func New(root string) (*Blobstore, error) { } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Upload(node *node.Node, source string) error { + dest, err := bs.path(node) if err != nil { return err } @@ -84,8 +85,8 @@ func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source strin } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { + dest, err := bs.path(node) if err != nil { return nil, err } @@ -93,19 +94,12 @@ func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCl if err != nil { return nil, errors.Wrapf(err, "could not read blob '%s'", dest) } - fi, err := file.Stat() - if err != nil { - return nil, errors.Wrapf(err, "could not stat blob '%s'", dest) - } - if fi.Size() != blobSize { - return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", blobSize, fi.Size()) - } return file, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(spaceID, blobID string) error { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Delete(node *node.Node) error { + dest, err := bs.path(node) if err != nil { return err } @@ -115,17 +109,14 @@ func (bs *Blobstore) Delete(spaceID, blobID string) error { return nil } -func (bs *Blobstore) path(spaceID, blobID string) (string, error) { - if spaceID == "" { - return "", fmt.Errorf("blobstore: spaceID is empty") - } - if blobID == "" { - return "", fmt.Errorf("blobstore: blobID is empty") +func (bs *Blobstore) path(node *node.Node) (string, error) { + if node.BlobID == "" { + return "", fmt.Errorf("blobstore: BlobID is empty") } return filepath.Join( bs.root, filepath.Clean(filepath.Join( - "/", "spaces", lookup.Pathify(spaceID, 1, 2), "blobs", lookup.Pathify(blobID, 4, 2)), + "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), ), ), nil } diff --git a/pkg/storage/fs/ocis/blobstore/blobstore_test.go b/pkg/storage/fs/ocis/blobstore/blobstore_test.go index 114148f6de..e895497abc 100644 --- a/pkg/storage/fs/ocis/blobstore/blobstore_test.go +++ b/pkg/storage/fs/ocis/blobstore/blobstore_test.go @@ -24,6 +24,7 @@ import ( "path" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/tests/helpers" . "github.com/onsi/ginkgo/v2" @@ -33,8 +34,7 @@ import ( var _ = Describe("Blobstore", func() { var ( tmpRoot string - spaceID string - blobID string + blobNode *node.Node blobPath string blobSrcFile string data []byte @@ -48,8 +48,10 @@ var _ = Describe("Blobstore", func() { Expect(err).ToNot(HaveOccurred()) data = []byte("1234567890") - spaceID = "wonderfullspace" - blobID = "huuuuugeblob" + blobNode = &node.Node{ + SpaceID: "wonderfullspace", + BlobID: "huuuuugeblob", + } blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob") blobSrcFile = path.Join(tmpRoot, "blobsrc") @@ -75,7 +77,7 @@ var _ = Describe("Blobstore", func() { Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed()) }) It("writes the blob", func() { - err := bs.Upload(spaceID, blobID, int64(len(data)), blobSrcFile) + err := bs.Upload(blobNode, blobSrcFile) Expect(err).ToNot(HaveOccurred()) writtenBytes, err := os.ReadFile(blobPath) @@ -93,7 +95,7 @@ var _ = Describe("Blobstore", func() { Describe("Download", func() { It("cleans the key", func() { - reader, err := bs.Download(spaceID, blobID, int64(len(data))) + reader, err := bs.Download(blobNode) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -102,7 +104,7 @@ var _ = Describe("Blobstore", func() { }) It("returns a reader to the blob", func() { - reader, err := bs.Download(spaceID, blobID, int64(len(data))) + reader, err := bs.Download(blobNode) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -116,7 +118,7 @@ var _ = Describe("Blobstore", func() { _, err := os.Stat(blobPath) Expect(err).ToNot(HaveOccurred()) - err = bs.Delete(spaceID, blobID) + err = bs.Delete(blobNode) Expect(err).ToNot(HaveOccurred()) _, err = os.Stat(blobPath) diff --git a/pkg/storage/fs/posix/blobstore/blobstore.go b/pkg/storage/fs/posix/blobstore/blobstore.go index d61a05acad..7afd3eeb55 100644 --- a/pkg/storage/fs/posix/blobstore/blobstore.go +++ b/pkg/storage/fs/posix/blobstore/blobstore.go @@ -26,6 +26,7 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -48,8 +49,8 @@ func New(root string) (*Blobstore, error) { } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Upload(node *node.Node, source string) error { + dest, err := bs.path(node) if err != nil { return err } @@ -84,8 +85,8 @@ func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source strin } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { + dest, err := bs.path(node) if err != nil { return nil, err } @@ -93,20 +94,12 @@ func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCl if err != nil { return nil, errors.Wrapf(err, "could not read blob '%s'", dest) } - fi, err := file.Stat() - if err != nil { - return nil, errors.Wrapf(err, "could not stat blob '%s'", dest) - } - if fi.Size() != blobSize { - return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", blobSize, fi.Size()) - } - return file, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(spaceID, blobID string) error { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Delete(node *node.Node) error { + dest, err := bs.path(node) if err != nil { return err } @@ -116,17 +109,14 @@ func (bs *Blobstore) Delete(spaceID, blobID string) error { return nil } -func (bs *Blobstore) path(spaceID, blobID string) (string, error) { - if spaceID == "" { - return "", fmt.Errorf("blobstore: spaceID is empty") - } - if blobID == "" { - return "", fmt.Errorf("blobstore: blobID is empty") +func (bs *Blobstore) path(node *node.Node) (string, error) { + if node.BlobID == "" { + return "", fmt.Errorf("blobstore: BlobID is empty") } return filepath.Join( bs.root, filepath.Clean(filepath.Join( - "/", "spaces", lookup.Pathify(spaceID, 1, 2), "blobs", lookup.Pathify(blobID, 4, 2)), + "/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)), ), ), nil } diff --git a/pkg/storage/fs/posix/blobstore/blobstore_test.go b/pkg/storage/fs/posix/blobstore/blobstore_test.go index 114148f6de..e895497abc 100644 --- a/pkg/storage/fs/posix/blobstore/blobstore_test.go +++ b/pkg/storage/fs/posix/blobstore/blobstore_test.go @@ -24,6 +24,7 @@ import ( "path" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/tests/helpers" . "github.com/onsi/ginkgo/v2" @@ -33,8 +34,7 @@ import ( var _ = Describe("Blobstore", func() { var ( tmpRoot string - spaceID string - blobID string + blobNode *node.Node blobPath string blobSrcFile string data []byte @@ -48,8 +48,10 @@ var _ = Describe("Blobstore", func() { Expect(err).ToNot(HaveOccurred()) data = []byte("1234567890") - spaceID = "wonderfullspace" - blobID = "huuuuugeblob" + blobNode = &node.Node{ + SpaceID: "wonderfullspace", + BlobID: "huuuuugeblob", + } blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob") blobSrcFile = path.Join(tmpRoot, "blobsrc") @@ -75,7 +77,7 @@ var _ = Describe("Blobstore", func() { Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed()) }) It("writes the blob", func() { - err := bs.Upload(spaceID, blobID, int64(len(data)), blobSrcFile) + err := bs.Upload(blobNode, blobSrcFile) Expect(err).ToNot(HaveOccurred()) writtenBytes, err := os.ReadFile(blobPath) @@ -93,7 +95,7 @@ var _ = Describe("Blobstore", func() { Describe("Download", func() { It("cleans the key", func() { - reader, err := bs.Download(spaceID, blobID, int64(len(data))) + reader, err := bs.Download(blobNode) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -102,7 +104,7 @@ var _ = Describe("Blobstore", func() { }) It("returns a reader to the blob", func() { - reader, err := bs.Download(spaceID, blobID, int64(len(data))) + reader, err := bs.Download(blobNode) Expect(err).ToNot(HaveOccurred()) readData, err := io.ReadAll(reader) @@ -116,7 +118,7 @@ var _ = Describe("Blobstore", func() { _, err := os.Stat(blobPath) Expect(err).ToNot(HaveOccurred()) - err = bs.Delete(spaceID, blobID) + err = bs.Delete(blobNode) Expect(err).ToNot(HaveOccurred()) _, err = os.Stat(blobPath) diff --git a/pkg/storage/fs/posix/tree/mocks/Blobstore.go b/pkg/storage/fs/posix/tree/mocks/Blobstore.go index 01640bbc20..3f23a832e1 100644 --- a/pkg/storage/fs/posix/tree/mocks/Blobstore.go +++ b/pkg/storage/fs/posix/tree/mocks/Blobstore.go @@ -23,6 +23,7 @@ package mocks import ( io "io" + node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" mock "github.com/stretchr/testify/mock" ) @@ -39,17 +40,17 @@ func (_m *Blobstore) EXPECT() *Blobstore_Expecter { return &Blobstore_Expecter{mock: &_m.Mock} } -// Delete provides a mock function with given fields: spaceID, blobID -func (_m *Blobstore) Delete(spaceID string, blobID string) error { - ret := _m.Called(spaceID, blobID) +// Delete provides a mock function with given fields: _a0 +func (_m *Blobstore) Delete(_a0 *node.Node) error { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for Delete") } var r0 error - if rf, ok := ret.Get(0).(func(string, string) error); ok { - r0 = rf(spaceID, blobID) + if rf, ok := ret.Get(0).(func(*node.Node) error); ok { + r0 = rf(_a0) } else { r0 = ret.Error(0) } @@ -63,15 +64,14 @@ type Blobstore_Delete_Call struct { } // Delete is a helper method to define mock.On call -// - spaceID string -// - blobID string -func (_e *Blobstore_Expecter) Delete(spaceID interface{}, blobID interface{}) *Blobstore_Delete_Call { - return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", spaceID, blobID)} +// - _a0 *node.Node +func (_e *Blobstore_Expecter) Delete(_a0 interface{}) *Blobstore_Delete_Call { + return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", _a0)} } -func (_c *Blobstore_Delete_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) Run(run func(_a0 *node.Node)) *Blobstore_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string)) + run(args[0].(*node.Node)) }) return _c } @@ -81,14 +81,14 @@ func (_c *Blobstore_Delete_Call) Return(_a0 error) *Blobstore_Delete_Call { return _c } -func (_c *Blobstore_Delete_Call) RunAndReturn(run func(string, string) error) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) RunAndReturn(run func(*node.Node) error) *Blobstore_Delete_Call { _c.Call.Return(run) return _c } -// Download provides a mock function with given fields: spaceID, blobID, blobSize -func (_m *Blobstore) Download(spaceID string, blobID string, blobSize int64) (io.ReadCloser, error) { - ret := _m.Called(spaceID, blobID, blobSize) +// Download provides a mock function with given fields: _a0 +func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for Download") @@ -96,19 +96,19 @@ func (_m *Blobstore) Download(spaceID string, blobID string, blobSize int64) (io var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(string, string, int64) (io.ReadCloser, error)); ok { - return rf(spaceID, blobID, blobSize) + if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { + return rf(_a0) } - if rf, ok := ret.Get(0).(func(string, string, int64) io.ReadCloser); ok { - r0 = rf(spaceID, blobID, blobSize) + if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { + r0 = rf(_a0) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { - r1 = rf(spaceID, blobID, blobSize) + if rf, ok := ret.Get(1).(func(*node.Node) error); ok { + r1 = rf(_a0) } else { r1 = ret.Error(1) } @@ -122,16 +122,14 @@ type Blobstore_Download_Call struct { } // Download is a helper method to define mock.On call -// - spaceID string -// - blobID string -// - blobSize int64 -func (_e *Blobstore_Expecter) Download(spaceID interface{}, blobID interface{}, blobSize interface{}) *Blobstore_Download_Call { - return &Blobstore_Download_Call{Call: _e.mock.On("Download", spaceID, blobID, blobSize)} +// - _a0 *node.Node +func (_e *Blobstore_Expecter) Download(_a0 interface{}) *Blobstore_Download_Call { + return &Blobstore_Download_Call{Call: _e.mock.On("Download", _a0)} } -func (_c *Blobstore_Download_Call) Run(run func(spaceID string, blobID string, blobSize int64)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) Run(run func(_a0 *node.Node)) *Blobstore_Download_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string), args[2].(int64)) + run(args[0].(*node.Node)) }) return _c } @@ -141,22 +139,22 @@ func (_c *Blobstore_Download_Call) Return(_a0 io.ReadCloser, _a1 error) *Blobsto return _c } -func (_c *Blobstore_Download_Call) RunAndReturn(run func(string, string, int64) (io.ReadCloser, error)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Blobstore_Download_Call { _c.Call.Return(run) return _c } -// Upload provides a mock function with given fields: spaceID, blobID, blobSize, source -func (_m *Blobstore) Upload(spaceID string, blobID string, blobSize int64, source string) error { - ret := _m.Called(spaceID, blobID, blobSize, source) +// Upload provides a mock function with given fields: _a0, source +func (_m *Blobstore) Upload(_a0 *node.Node, source string) error { + ret := _m.Called(_a0, source) if len(ret) == 0 { panic("no return value specified for Upload") } var r0 error - if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { - r0 = rf(spaceID, blobID, blobSize, source) + if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { + r0 = rf(_a0, source) } else { r0 = ret.Error(0) } @@ -170,17 +168,15 @@ type Blobstore_Upload_Call struct { } // Upload is a helper method to define mock.On call -// - spaceID string -// - blobID string -// - blobSize int64 +// - _a0 *node.Node // - source string -func (_e *Blobstore_Expecter) Upload(spaceID interface{}, blobID interface{}, blobSize interface{}, source interface{}) *Blobstore_Upload_Call { - return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", spaceID, blobID, blobSize, source)} +func (_e *Blobstore_Expecter) Upload(_a0 interface{}, source interface{}) *Blobstore_Upload_Call { + return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", _a0, source)} } -func (_c *Blobstore_Upload_Call) Run(run func(spaceID string, blobID string, blobSize int64, source string)) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) Run(run func(_a0 *node.Node, source string)) *Blobstore_Upload_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) + run(args[0].(*node.Node), args[1].(string)) }) return _c } @@ -190,7 +186,7 @@ func (_c *Blobstore_Upload_Call) Return(_a0 error) *Blobstore_Upload_Call { return _c } -func (_c *Blobstore_Upload_Call) RunAndReturn(run func(string, string, int64, string) error) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) RunAndReturn(run func(*node.Node, string) error) *Blobstore_Upload_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/fs/posix/tree/tree.go b/pkg/storage/fs/posix/tree/tree.go index ad4ed80ffc..b157133cac 100644 --- a/pkg/storage/fs/posix/tree/tree.go +++ b/pkg/storage/fs/posix/tree/tree.go @@ -21,6 +21,7 @@ package tree import ( "bytes" "context" + "fmt" "io" "io/fs" "os" @@ -55,9 +56,9 @@ func init() { // Blobstore defines an interface for storing blobs in a blobstore type Blobstore interface { - Upload(spaceID, blobID string, blobSize int64, source string) error - Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) - Delete(spaceID, blobID string) error + Upload(node *node.Node, source string) error + Download(node *node.Node) (io.ReadCloser, error) + Delete(node *node.Node) error } // Tree manages a hierarchical tree @@ -615,7 +616,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n.SpaceID, n.BlobID); err != nil { + if err := t.DeleteBlob(n); err != nil { log.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -644,7 +645,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error } if bID != "" { - if err := t.DeleteBlob(n.SpaceID, bID); err != nil { + if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { log.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -661,22 +662,29 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(spaceID, blobID string, blobSize int64, source string) error { - return t.blobstore.Upload(spaceID, blobID, blobSize, source) +func (t *Tree) WriteBlob(node *node.Node, source string) error { + return t.blobstore.Upload(node, source) } // ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { - if blobID == "" { +func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { + if node.BlobID == "" { // there is no blob yet - we are dealing with a 0 byte file return io.NopCloser(bytes.NewReader([]byte{})), nil } - return t.blobstore.Download(spaceID, blobID, blobSize) + return t.blobstore.Download(node) } // DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(spaceID, blobID string) error { - return t.blobstore.Delete(spaceID, blobID) +func (t *Tree) DeleteBlob(node *node.Node) error { + if node == nil { + return fmt.Errorf("could not delete blob, nil node was given") + } + if node.BlobID == "" { + return fmt.Errorf("could not delete blob, node with empty blob id was given") + } + + return t.blobstore.Delete(node) } // TODO check if node exists? diff --git a/pkg/storage/fs/s3ng/blobstore/blobstore.go b/pkg/storage/fs/s3ng/blobstore/blobstore.go index 3150ed98c1..48a40b764f 100644 --- a/pkg/storage/fs/s3ng/blobstore/blobstore.go +++ b/pkg/storage/fs/s3ng/blobstore/blobstore.go @@ -27,6 +27,7 @@ import ( "path/filepath" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" @@ -75,19 +76,14 @@ func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOption } // Upload stores some data in the blobstore under the given key -func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source string) error { - dest, err := bs.path(spaceID, blobID) - if err != nil { - return err - } - +func (bs *Blobstore) Upload(node *node.Node, source string) error { reader, err := os.Open(source) if err != nil { return errors.Wrap(err, "can not open source file to upload") } defer reader.Close() - _, err = bs.client.PutObject(context.Background(), bs.bucket, dest, reader, blobSize, minio.PutObjectOptions{ + _, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ ContentType: "application/octet-stream", SendContentMd5: bs.defaultPutOptions.SendContentMd5, ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts, @@ -98,54 +94,40 @@ func (bs *Blobstore) Upload(spaceID, blobID string, blobSize int64, source strin }) if err != nil { - return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", dest, bs.bucket) + return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket) } return nil } // Download retrieves a blob from the blobstore for reading -func (bs *Blobstore) Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { - dest, err := bs.path(spaceID, blobID) - if err != nil { - return nil, err - } - reader, err := bs.client.GetObject(context.Background(), bs.bucket, dest, minio.GetObjectOptions{}) +func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) { + reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.path(node), minio.GetObjectOptions{}) if err != nil { - return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", dest, bs.bucket) + return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.path(node), bs.bucket) } stat, err := reader.Stat() if err != nil { - return nil, errors.Wrapf(err, "blob path: %s", dest) + return nil, errors.Wrapf(err, "blob path: %s", bs.path(node)) } - if blobSize != stat.Size { - return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", blobSize, stat.Size) + if node.Blobsize != stat.Size { + return nil, fmt.Errorf("blob has unexpected size. %d bytes expected, got %d bytes", node.Blobsize, stat.Size) } return reader, nil } // Delete deletes a blob from the blobstore -func (bs *Blobstore) Delete(spaceID, blobID string) error { - dest, err := bs.path(spaceID, blobID) +func (bs *Blobstore) Delete(node *node.Node) error { + err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.path(node), minio.RemoveObjectOptions{}) if err != nil { - return err - } - err = bs.client.RemoveObject(context.Background(), bs.bucket, dest, minio.RemoveObjectOptions{}) - if err != nil { - return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", dest, bs.bucket) + return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.path(node), bs.bucket) } return nil } -func (bs *Blobstore) path(spaceID, blobID string) (string, error) { - if spaceID == "" { - return "", fmt.Errorf("blobstore: spaceID is empty") - } - if blobID == "" { - return "", fmt.Errorf("blobstore: blobID is empty") - } +func (bs *Blobstore) path(node *node.Node) string { // https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/ // Prefixes are used to partion a bucket. A prefix is everything except the filename. // For a file `BucketName/foo/bar/lorem.ipsum`, `BucketName/foo/bar/` is the prefix. @@ -154,5 +136,5 @@ func (bs *Blobstore) path(spaceID, blobID string) (string, error) { // // Since the spaceID is always the same for a space, we don't need to pathify that, because it would // not yield any performance gains - return filepath.Clean(filepath.Join(spaceID, lookup.Pathify(blobID, 4, 2))), nil + return filepath.Clean(filepath.Join(node.SpaceID, lookup.Pathify(node.BlobID, 4, 2))) } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index c3e9add631..d1ebb61e2f 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -1052,7 +1052,7 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) ( if currentEtag != expectedEtag { return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag)) } - reader, err := fs.tp.ReadBlob(n.SpaceID, n.BlobID, n.Blobsize) + reader, err := fs.tp.ReadBlob(n) if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'") } diff --git a/pkg/storage/utils/decomposedfs/node/mocks/Tree.go b/pkg/storage/utils/decomposedfs/node/mocks/Tree.go index 410cc1cd82..d6a07c8137 100644 --- a/pkg/storage/utils/decomposedfs/node/mocks/Tree.go +++ b/pkg/storage/utils/decomposedfs/node/mocks/Tree.go @@ -138,17 +138,17 @@ func (_c *Tree_Delete_Call) RunAndReturn(run func(context.Context, *node.Node) e return _c } -// DeleteBlob provides a mock function with given fields: spaceID, blobId -func (_m *Tree) DeleteBlob(spaceID string, blobId string) error { - ret := _m.Called(spaceID, blobId) +// DeleteBlob provides a mock function with given fields: _a0 +func (_m *Tree) DeleteBlob(_a0 *node.Node) error { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for DeleteBlob") } var r0 error - if rf, ok := ret.Get(0).(func(string, string) error); ok { - r0 = rf(spaceID, blobId) + if rf, ok := ret.Get(0).(func(*node.Node) error); ok { + r0 = rf(_a0) } else { r0 = ret.Error(0) } @@ -162,15 +162,14 @@ type Tree_DeleteBlob_Call struct { } // DeleteBlob is a helper method to define mock.On call -// - spaceID string -// - blobId string -func (_e *Tree_Expecter) DeleteBlob(spaceID interface{}, blobId interface{}) *Tree_DeleteBlob_Call { - return &Tree_DeleteBlob_Call{Call: _e.mock.On("DeleteBlob", spaceID, blobId)} +// - _a0 *node.Node +func (_e *Tree_Expecter) DeleteBlob(_a0 interface{}) *Tree_DeleteBlob_Call { + return &Tree_DeleteBlob_Call{Call: _e.mock.On("DeleteBlob", _a0)} } -func (_c *Tree_DeleteBlob_Call) Run(run func(spaceID string, blobId string)) *Tree_DeleteBlob_Call { +func (_c *Tree_DeleteBlob_Call) Run(run func(_a0 *node.Node)) *Tree_DeleteBlob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string)) + run(args[0].(*node.Node)) }) return _c } @@ -180,7 +179,7 @@ func (_c *Tree_DeleteBlob_Call) Return(_a0 error) *Tree_DeleteBlob_Call { return _c } -func (_c *Tree_DeleteBlob_Call) RunAndReturn(run func(string, string) error) *Tree_DeleteBlob_Call { +func (_c *Tree_DeleteBlob_Call) RunAndReturn(run func(*node.Node) error) *Tree_DeleteBlob_Call { _c.Call.Return(run) return _c } @@ -469,9 +468,9 @@ func (_c *Tree_PurgeRecycleItemFunc_Call) RunAndReturn(run func(context.Context, return _c } -// ReadBlob provides a mock function with given fields: spaceID, blobId, blobSize -func (_m *Tree) ReadBlob(spaceID string, blobId string, blobSize int64) (io.ReadCloser, error) { - ret := _m.Called(spaceID, blobId, blobSize) +// ReadBlob provides a mock function with given fields: _a0 +func (_m *Tree) ReadBlob(_a0 *node.Node) (io.ReadCloser, error) { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for ReadBlob") @@ -479,19 +478,19 @@ func (_m *Tree) ReadBlob(spaceID string, blobId string, blobSize int64) (io.Read var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(string, string, int64) (io.ReadCloser, error)); ok { - return rf(spaceID, blobId, blobSize) + if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { + return rf(_a0) } - if rf, ok := ret.Get(0).(func(string, string, int64) io.ReadCloser); ok { - r0 = rf(spaceID, blobId, blobSize) + if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { + r0 = rf(_a0) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { - r1 = rf(spaceID, blobId, blobSize) + if rf, ok := ret.Get(1).(func(*node.Node) error); ok { + r1 = rf(_a0) } else { r1 = ret.Error(1) } @@ -505,16 +504,14 @@ type Tree_ReadBlob_Call struct { } // ReadBlob is a helper method to define mock.On call -// - spaceID string -// - blobId string -// - blobSize int64 -func (_e *Tree_Expecter) ReadBlob(spaceID interface{}, blobId interface{}, blobSize interface{}) *Tree_ReadBlob_Call { - return &Tree_ReadBlob_Call{Call: _e.mock.On("ReadBlob", spaceID, blobId, blobSize)} +// - _a0 *node.Node +func (_e *Tree_Expecter) ReadBlob(_a0 interface{}) *Tree_ReadBlob_Call { + return &Tree_ReadBlob_Call{Call: _e.mock.On("ReadBlob", _a0)} } -func (_c *Tree_ReadBlob_Call) Run(run func(spaceID string, blobId string, blobSize int64)) *Tree_ReadBlob_Call { +func (_c *Tree_ReadBlob_Call) Run(run func(_a0 *node.Node)) *Tree_ReadBlob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string), args[2].(int64)) + run(args[0].(*node.Node)) }) return _c } @@ -524,7 +521,7 @@ func (_c *Tree_ReadBlob_Call) Return(_a0 io.ReadCloser, _a1 error) *Tree_ReadBlo return _c } -func (_c *Tree_ReadBlob_Call) RunAndReturn(run func(string, string, int64) (io.ReadCloser, error)) *Tree_ReadBlob_Call { +func (_c *Tree_ReadBlob_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Tree_ReadBlob_Call { _c.Call.Return(run) return _c } @@ -703,17 +700,17 @@ func (_c *Tree_TouchFile_Call) RunAndReturn(run func(context.Context, *node.Node return _c } -// WriteBlob provides a mock function with given fields: spaceID, blobId, blobSize, source -func (_m *Tree) WriteBlob(spaceID string, blobId string, blobSize int64, source string) error { - ret := _m.Called(spaceID, blobId, blobSize, source) +// WriteBlob provides a mock function with given fields: _a0, source +func (_m *Tree) WriteBlob(_a0 *node.Node, source string) error { + ret := _m.Called(_a0, source) if len(ret) == 0 { panic("no return value specified for WriteBlob") } var r0 error - if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { - r0 = rf(spaceID, blobId, blobSize, source) + if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { + r0 = rf(_a0, source) } else { r0 = ret.Error(0) } @@ -727,17 +724,15 @@ type Tree_WriteBlob_Call struct { } // WriteBlob is a helper method to define mock.On call -// - spaceID string -// - blobId string -// - blobSize int64 +// - _a0 *node.Node // - source string -func (_e *Tree_Expecter) WriteBlob(spaceID interface{}, blobId interface{}, blobSize interface{}, source interface{}) *Tree_WriteBlob_Call { - return &Tree_WriteBlob_Call{Call: _e.mock.On("WriteBlob", spaceID, blobId, blobSize, source)} +func (_e *Tree_Expecter) WriteBlob(_a0 interface{}, source interface{}) *Tree_WriteBlob_Call { + return &Tree_WriteBlob_Call{Call: _e.mock.On("WriteBlob", _a0, source)} } -func (_c *Tree_WriteBlob_Call) Run(run func(spaceID string, blobId string, blobSize int64, source string)) *Tree_WriteBlob_Call { +func (_c *Tree_WriteBlob_Call) Run(run func(_a0 *node.Node, source string)) *Tree_WriteBlob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) + run(args[0].(*node.Node), args[1].(string)) }) return _c } @@ -747,7 +742,7 @@ func (_c *Tree_WriteBlob_Call) Return(_a0 error) *Tree_WriteBlob_Call { return _c } -func (_c *Tree_WriteBlob_Call) RunAndReturn(run func(string, string, int64, string) error) *Tree_WriteBlob_Call { +func (_c *Tree_WriteBlob_Call) RunAndReturn(run func(*node.Node, string) error) *Tree_WriteBlob_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 404c8f4e56..db22961fd2 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -99,9 +99,9 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error) - WriteBlob(spaceID, blobId string, blobSize int64, source string) error - ReadBlob(spaceID, blobId string, blobSize int64) (io.ReadCloser, error) - DeleteBlob(spaceID, blobId string) error + WriteBlob(node *Node, source string) error + ReadBlob(node *Node) (io.ReadCloser, error) + DeleteBlob(node *Node) error Propagate(ctx context.Context, node *Node, sizeDiff int64) (err error) } diff --git a/pkg/storage/utils/decomposedfs/recycle_test.go b/pkg/storage/utils/decomposedfs/recycle_test.go index 7dd0f365d9..1af163d093 100644 --- a/pkg/storage/utils/decomposedfs/recycle_test.go +++ b/pkg/storage/utils/decomposedfs/recycle_test.go @@ -86,7 +86,7 @@ var _ = Describe("Recycle", func() { It("they can be permanently deleted by this user", func() { // mock call to blobstore - env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -104,7 +104,7 @@ var _ = Describe("Recycle", func() { }) It("they can be restored", func() { - env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -177,7 +177,7 @@ var _ = Describe("Recycle", func() { }) It("they can be permanently deleted by the other user", func() { - env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) @@ -206,7 +206,7 @@ var _ = Describe("Recycle", func() { }) It("they can be restored by the other user", func() { - env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil).Times(2) + env.Blobstore.On("Delete", mock.Anything).Return(nil).Times(2) items, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index 0775c354cd..c079c3bddb 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -152,13 +152,14 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey) } - blobsize, err := fs.lu.ReadBlobSizeAttr(ctx, contentPath) if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not read blob size of revision '%s' for node '%s'", n.ID, revisionKey) } - reader, err := fs.tp.ReadBlob(spaceID, blobid, blobsize) + revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore + + reader, err := fs.tp.ReadBlob(&revisionNode) if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) } @@ -319,7 +320,7 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere return err } - return fs.tp.DeleteBlob(n.SpaceID, n.BlobID) + return fs.tp.DeleteBlob(n) } func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { diff --git a/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go b/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go index 01640bbc20..3f23a832e1 100644 --- a/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go +++ b/pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go @@ -23,6 +23,7 @@ package mocks import ( io "io" + node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" mock "github.com/stretchr/testify/mock" ) @@ -39,17 +40,17 @@ func (_m *Blobstore) EXPECT() *Blobstore_Expecter { return &Blobstore_Expecter{mock: &_m.Mock} } -// Delete provides a mock function with given fields: spaceID, blobID -func (_m *Blobstore) Delete(spaceID string, blobID string) error { - ret := _m.Called(spaceID, blobID) +// Delete provides a mock function with given fields: _a0 +func (_m *Blobstore) Delete(_a0 *node.Node) error { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for Delete") } var r0 error - if rf, ok := ret.Get(0).(func(string, string) error); ok { - r0 = rf(spaceID, blobID) + if rf, ok := ret.Get(0).(func(*node.Node) error); ok { + r0 = rf(_a0) } else { r0 = ret.Error(0) } @@ -63,15 +64,14 @@ type Blobstore_Delete_Call struct { } // Delete is a helper method to define mock.On call -// - spaceID string -// - blobID string -func (_e *Blobstore_Expecter) Delete(spaceID interface{}, blobID interface{}) *Blobstore_Delete_Call { - return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", spaceID, blobID)} +// - _a0 *node.Node +func (_e *Blobstore_Expecter) Delete(_a0 interface{}) *Blobstore_Delete_Call { + return &Blobstore_Delete_Call{Call: _e.mock.On("Delete", _a0)} } -func (_c *Blobstore_Delete_Call) Run(run func(spaceID string, blobID string)) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) Run(run func(_a0 *node.Node)) *Blobstore_Delete_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string)) + run(args[0].(*node.Node)) }) return _c } @@ -81,14 +81,14 @@ func (_c *Blobstore_Delete_Call) Return(_a0 error) *Blobstore_Delete_Call { return _c } -func (_c *Blobstore_Delete_Call) RunAndReturn(run func(string, string) error) *Blobstore_Delete_Call { +func (_c *Blobstore_Delete_Call) RunAndReturn(run func(*node.Node) error) *Blobstore_Delete_Call { _c.Call.Return(run) return _c } -// Download provides a mock function with given fields: spaceID, blobID, blobSize -func (_m *Blobstore) Download(spaceID string, blobID string, blobSize int64) (io.ReadCloser, error) { - ret := _m.Called(spaceID, blobID, blobSize) +// Download provides a mock function with given fields: _a0 +func (_m *Blobstore) Download(_a0 *node.Node) (io.ReadCloser, error) { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for Download") @@ -96,19 +96,19 @@ func (_m *Blobstore) Download(spaceID string, blobID string, blobSize int64) (io var r0 io.ReadCloser var r1 error - if rf, ok := ret.Get(0).(func(string, string, int64) (io.ReadCloser, error)); ok { - return rf(spaceID, blobID, blobSize) + if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { + return rf(_a0) } - if rf, ok := ret.Get(0).(func(string, string, int64) io.ReadCloser); ok { - r0 = rf(spaceID, blobID, blobSize) + if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { + r0 = rf(_a0) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadCloser) } } - if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { - r1 = rf(spaceID, blobID, blobSize) + if rf, ok := ret.Get(1).(func(*node.Node) error); ok { + r1 = rf(_a0) } else { r1 = ret.Error(1) } @@ -122,16 +122,14 @@ type Blobstore_Download_Call struct { } // Download is a helper method to define mock.On call -// - spaceID string -// - blobID string -// - blobSize int64 -func (_e *Blobstore_Expecter) Download(spaceID interface{}, blobID interface{}, blobSize interface{}) *Blobstore_Download_Call { - return &Blobstore_Download_Call{Call: _e.mock.On("Download", spaceID, blobID, blobSize)} +// - _a0 *node.Node +func (_e *Blobstore_Expecter) Download(_a0 interface{}) *Blobstore_Download_Call { + return &Blobstore_Download_Call{Call: _e.mock.On("Download", _a0)} } -func (_c *Blobstore_Download_Call) Run(run func(spaceID string, blobID string, blobSize int64)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) Run(run func(_a0 *node.Node)) *Blobstore_Download_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string), args[2].(int64)) + run(args[0].(*node.Node)) }) return _c } @@ -141,22 +139,22 @@ func (_c *Blobstore_Download_Call) Return(_a0 io.ReadCloser, _a1 error) *Blobsto return _c } -func (_c *Blobstore_Download_Call) RunAndReturn(run func(string, string, int64) (io.ReadCloser, error)) *Blobstore_Download_Call { +func (_c *Blobstore_Download_Call) RunAndReturn(run func(*node.Node) (io.ReadCloser, error)) *Blobstore_Download_Call { _c.Call.Return(run) return _c } -// Upload provides a mock function with given fields: spaceID, blobID, blobSize, source -func (_m *Blobstore) Upload(spaceID string, blobID string, blobSize int64, source string) error { - ret := _m.Called(spaceID, blobID, blobSize, source) +// Upload provides a mock function with given fields: _a0, source +func (_m *Blobstore) Upload(_a0 *node.Node, source string) error { + ret := _m.Called(_a0, source) if len(ret) == 0 { panic("no return value specified for Upload") } var r0 error - if rf, ok := ret.Get(0).(func(string, string, int64, string) error); ok { - r0 = rf(spaceID, blobID, blobSize, source) + if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { + r0 = rf(_a0, source) } else { r0 = ret.Error(0) } @@ -170,17 +168,15 @@ type Blobstore_Upload_Call struct { } // Upload is a helper method to define mock.On call -// - spaceID string -// - blobID string -// - blobSize int64 +// - _a0 *node.Node // - source string -func (_e *Blobstore_Expecter) Upload(spaceID interface{}, blobID interface{}, blobSize interface{}, source interface{}) *Blobstore_Upload_Call { - return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", spaceID, blobID, blobSize, source)} +func (_e *Blobstore_Expecter) Upload(_a0 interface{}, source interface{}) *Blobstore_Upload_Call { + return &Blobstore_Upload_Call{Call: _e.mock.On("Upload", _a0, source)} } -func (_c *Blobstore_Upload_Call) Run(run func(spaceID string, blobID string, blobSize int64, source string)) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) Run(run func(_a0 *node.Node, source string)) *Blobstore_Upload_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string), args[1].(string), args[2].(int64), args[3].(string)) + run(args[0].(*node.Node), args[1].(string)) }) return _c } @@ -190,7 +186,7 @@ func (_c *Blobstore_Upload_Call) Return(_a0 error) *Blobstore_Upload_Call { return _c } -func (_c *Blobstore_Upload_Call) RunAndReturn(run func(string, string, int64, string) error) *Blobstore_Upload_Call { +func (_c *Blobstore_Upload_Call) RunAndReturn(run func(*node.Node, string) error) *Blobstore_Upload_Call { _c.Call.Return(run) return _c } diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 571fd07943..3d8a057d8b 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -21,6 +21,7 @@ package tree import ( "bytes" "context" + "fmt" "io" "io/fs" "os" @@ -54,9 +55,9 @@ func init() { // Blobstore defines an interface for storing blobs in a blobstore type Blobstore interface { - Upload(spaceID, blobID string, blobSize int64, source string) error - Download(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) - Delete(spaceID, blobID string) error + Upload(node *node.Node, source string) error + Download(node *node.Node) (io.ReadCloser, error) + Delete(node *node.Node) error } // Tree manages a hierarchical tree @@ -694,7 +695,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node. // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n.SpaceID, n.BlobID); err != nil { + if err := t.DeleteBlob(n); err != nil { logger.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -723,7 +724,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node. } if bID != "" { - if err := t.DeleteBlob(n.SpaceID, bID); err != nil { + if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { logger.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -740,22 +741,29 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(spaceID, blobID string, blobSize int64, source string) error { - return t.blobstore.Upload(spaceID, blobID, blobSize, source) +func (t *Tree) WriteBlob(node *node.Node, source string) error { + return t.blobstore.Upload(node, source) } // ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) { - if blobID == "" { +func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { + if node.BlobID == "" { // there is no blob yet - we are dealing with a 0 byte file return io.NopCloser(bytes.NewReader([]byte{})), nil } - return t.blobstore.Download(spaceID, blobID, blobSize) + return t.blobstore.Download(node) } // DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(spaceID, blobID string) error { - return t.blobstore.Delete(spaceID, blobID) +func (t *Tree) DeleteBlob(node *node.Node) error { + if node == nil { + return fmt.Errorf("could not delete blob, nil node was given") + } + if node.BlobID == "" { + return fmt.Errorf("could not delete blob, node with empty blob id was given") + } + + return t.blobstore.Delete(node) } // TODO check if node exists? diff --git a/pkg/storage/utils/decomposedfs/tree/tree_test.go b/pkg/storage/utils/decomposedfs/tree/tree_test.go index 2f9bb39712..74acdf7473 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree_test.go +++ b/pkg/storage/utils/decomposedfs/tree/tree_test.go @@ -128,7 +128,7 @@ var _ = Describe("Tree", func() { }) It("does not delete the blob from the blobstore", func() { - env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")) + env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node")) }) }) }) @@ -139,7 +139,7 @@ var _ = Describe("Tree", func() { ) JustBeforeEach(func() { - env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil) + env.Blobstore.On("Delete", mock.AnythingOfType("*node.Node")).Return(nil) trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) Expect(t.Delete(env.Ctx, n)).To(Succeed()) }) @@ -160,7 +160,7 @@ var _ = Describe("Tree", func() { }) It("deletes the blob from the blobstore", func() { - env.Blobstore.AssertCalled(GinkgoT(), "Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")) + env.Blobstore.AssertCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node")) }) }) @@ -264,7 +264,7 @@ var _ = Describe("Tree", func() { trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) Expect(t.Delete(env.Ctx, n)).To(Succeed()) - env.Blobstore.On("Delete", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(nil) + env.Blobstore.On("Delete", mock.Anything).Return(nil) }) Describe("PurgeRecycleItemFunc", func() { diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 3bfc4375da..45f48ec844 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -67,9 +67,9 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) - WriteBlob(spaceID, blobID string, blobSize int64, binPath string) error - ReadBlob(spaceID, blobID string, blobSize int64) (io.ReadCloser, error) - DeleteBlob(spaceID, blobID string) error + WriteBlob(node *node.Node, binPath string) error + ReadBlob(node *node.Node) (io.ReadCloser, error) + DeleteBlob(node *node.Node) error Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) } @@ -279,9 +279,11 @@ func (session *OcisSession) Finalize() (err error) { ctx, span := tracer.Start(session.Context(context.Background()), "Finalize") defer span.End() + revisionNode := &node.Node{SpaceID: session.SpaceID(), BlobID: session.ID(), Blobsize: session.Size()} + // upload the data to the blobstore _, subspan := tracer.Start(ctx, "WriteBlob") - err = session.store.tp.WriteBlob(session.SpaceID(), session.ID(), session.Size(), session.binPath()) + err = session.store.tp.WriteBlob(revisionNode, session.binPath()) subspan.End() if err != nil { return errors.Wrap(err, "failed to upload file to blobstore") @@ -344,7 +346,7 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool // actually delete the node session.removeNode(ctx) } - // FIXME else if the upload has become a revision, delete the revision + // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node } } diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index 63d7b950cb..03ac4369ea 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -19,6 +19,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/mocks" @@ -141,13 +142,13 @@ var _ = Describe("Async file uploads", Ordered, func() { Expect(err).ToNot(HaveOccurred()) ref.ResourceId = &resID - bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). + bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). Return(nil). Run(func(args mock.Arguments) { - size := args.Get(2).(int64) - data, err := os.ReadFile(args.Get(3).(string)) + n := args.Get(0).(*node.Node) + data, err := os.ReadFile(args.Get(1).(string)) Expect(err).ToNot(HaveOccurred()) - Expect(len(data)).To(Equal(int(size))) + Expect(len(data)).To(Equal(int(n.Blobsize))) }) // start upload of a file diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 90e37edb58..5899b59f96 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -238,7 +238,7 @@ var _ = Describe("File uploads", func() { When("the user initiates a zero byte file upload", func() { It("succeeds", func() { - bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). + bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). Return(nil) uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) @@ -253,7 +253,7 @@ var _ = Describe("File uploads", func() { }) It("fails when trying to upload empty data. 0-byte uploads are finished during initialization already", func() { - bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). + bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). Return(nil) uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) @@ -288,10 +288,10 @@ var _ = Describe("File uploads", func() { uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]} - bs.On("Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")). + bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything). Return(nil). Run(func(args mock.Arguments) { - data, err := os.ReadFile(args.Get(3).(string)) + data, err := os.ReadFile(args.Get(1).(string)) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal([]byte("0123456789"))) @@ -304,7 +304,7 @@ var _ = Describe("File uploads", func() { }, nil) Expect(err).ToNot(HaveOccurred()) - bs.AssertCalled(GinkgoT(), "Upload", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("string")) + bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) resources, err := fs.ListFolder(ctx, rootRef, []string{}, []string{})