diff --git a/pkg/storage/fs/posix/tree/tree.go b/pkg/storage/fs/posix/tree/tree.go index 5282e9a39fc..c4affa3ce41 100644 --- a/pkg/storage/fs/posix/tree/tree.go +++ b/pkg/storage/fs/posix/tree/tree.go @@ -34,6 +34,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" @@ -699,6 +700,33 @@ func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string, return spaceid, entry, nil } +// InitNewNode initializes a new node +func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) { + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + return nil, err + } + + // create and write lock new node metadata + unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath()) + if err != nil { + return nil, err + } + + // we also need to touch the actual node file here it stores the mtime of the resource + h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return unlock, err + } + h.Close() + + if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil { + return unlock, err + } + + return unlock, nil +} + // TODO check if node exists? func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { ctx, span := tracer.Start(ctx, "createDirNode") diff --git a/pkg/storage/utils/decomposedfs/node/mocks/Tree.go b/pkg/storage/utils/decomposedfs/node/mocks/Tree.go index 8070e34fbda..ffaccdad356 100644 --- a/pkg/storage/utils/decomposedfs/node/mocks/Tree.go +++ b/pkg/storage/utils/decomposedfs/node/mocks/Tree.go @@ -26,6 +26,7 @@ import ( io "io" + metadata "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" mock "github.com/stretchr/testify/mock" node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" @@ -118,6 +119,32 @@ func (_m *Tree) GetMD(ctx context.Context, _a1 *node.Node) (fs.FileInfo, error) return r0, r1 } +// InitNewNode provides a mock function with given fields: ctx, n, fsize +func (_m *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) { + ret := _m.Called(ctx, n, fsize) + + var r0 metadata.UnlockFunc + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *node.Node, uint64) (metadata.UnlockFunc, error)); ok { + return rf(ctx, n, fsize) + } + if rf, ok := ret.Get(0).(func(context.Context, *node.Node, uint64) metadata.UnlockFunc); ok { + r0 = rf(ctx, n, fsize) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.UnlockFunc) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *node.Node, uint64) error); ok { + r1 = rf(ctx, n, fsize) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ListFolder provides a mock function with given fields: ctx, _a1 func (_m *Tree) ListFolder(ctx context.Context, _a1 *node.Node) ([]*node.Node, error) { ret := _m.Called(ctx, _a1) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index f9a4101679f..5aece02930f 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -101,6 +101,8 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error) + InitNewNode(ctx context.Context, n *Node, fsize uint64) (metadata.UnlockFunc, error) + WriteBlob(node *Node, source string) error ReadBlob(node *Node) (io.ReadCloser, error) DeleteBlob(node *Node) error diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 54db203f4c3..be7ce59afd0 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -34,6 +34,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" @@ -663,6 +664,49 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa return rn, fn, nil } +// InitNewNode initializes a new node +func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) { + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + return nil, err + } + + // create and write lock new node metadata + unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath()) + if err != nil { + return nil, err + } + + // we also need to touch the actual node file here it stores the mtime of the resource + h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return unlock, err + } + h.Close() + + if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil { + return unlock, err + } + + // link child name to parent if it is new + childNameLink := filepath.Join(n.ParentPath(), n.Name) + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger() + log.Info().Msg("initNewNode: creating symlink") + + if err = os.Symlink(relativeNodePath, childNameLink); err != nil { + log.Info().Err(err).Msg("initNewNode: symlink failed") + if errors.Is(err, fs.ErrExist) { + log.Info().Err(err).Msg("initNewNode: symlink already exists") + return unlock, errtypes.AlreadyExists(n.Name) + } + return unlock, errors.Wrap(err, "Decomposedfs: could not symlink child entry") + } + log.Info().Msg("initNewNode: symlink created") + + return unlock, nil +} + func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node.Node) error { logger := appctx.GetLogger(ctx) diff --git a/pkg/storage/utils/decomposedfs/upload/store.go b/pkg/storage/utils/decomposedfs/upload/store.go index 22efd41e059..95200f8c218 100644 --- a/pkg/storage/utils/decomposedfs/upload/store.go +++ b/pkg/storage/utils/decomposedfs/upload/store.go @@ -54,7 +54,7 @@ type PermissionsChecker interface { // OcisStore manages upload sessions type OcisStore struct { lu node.PathLookup - tp Tree + tp node.Tree root string pub events.Publisher async bool @@ -63,7 +63,7 @@ type OcisStore struct { } // NewSessionStore returns a new OcisStore -func NewSessionStore(lu node.PathLookup, tp Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions, disableVersioning bool) *OcisStore { +func NewSessionStore(lu node.PathLookup, tp node.Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions, disableVersioning bool) *OcisStore { return &OcisStore{ lu: lu, tp: tp, @@ -219,10 +219,12 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. appctx.GetLogger(ctx).Error().Err(err).Msg("failed to cache id") } } - unlock, err = store.initNewNode(ctx, session, n, uint64(session.Size())) + + unlock, err = store.tp.InitNewNode(ctx, n, uint64(session.Size())) if err != nil { appctx.GetLogger(ctx).Error().Err(err).Msg("failed to init new node") } + session.info.MetaData["sizeDiff"] = strconv.FormatInt(session.Size(), 10) } defer func() { if unlock == nil { @@ -267,34 +269,6 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. return n, nil } -func (store OcisStore) initNewNode(ctx context.Context, session *OcisSession, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) { - // create folder structure (if needed) - if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { - return nil, err - } - - // create and write lock new node metadata - unlock, err := store.lu.MetadataBackend().Lock(n.InternalPath()) - if err != nil { - return nil, err - } - - // we also need to touch the actual node file here it stores the mtime of the resource - h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600) - if err != nil { - return unlock, err - } - h.Close() - - if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil { - return unlock, err - } - - // on a new file the sizeDiff is the fileSize - session.info.MetaData["sizeDiff"] = strconv.FormatInt(int64(fsize), 10) - return unlock, nil -} - func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (metadata.UnlockFunc, error) { targetPath := n.InternalPath() @@ -363,12 +337,12 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess versionPath = session.store.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano)) // create version node - if _, err := os.Create(session.info.MetaData["versionsPath"]); err != nil { + if _, err := os.Create(versionPath); err != nil { return unlock, err } // copy blob metadata to version node - if err := store.lu.CopyMetadataWithSourceLock(ctx, targetPath, session.info.MetaData["versionsPath"], func(attributeName string, value []byte) (newValue []byte, copy bool) { + if err := store.lu.CopyMetadataWithSourceLock(ctx, targetPath, versionPath, func(attributeName string, value []byte) (newValue []byte, copy bool) { return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || attributeName == prefixes.TypeAttr || attributeName == prefixes.BlobIDAttr || diff --git a/pkg/storage/utils/decomposedfs/upload/store_test.go b/pkg/storage/utils/decomposedfs/upload/store_test.go index bdf2f89c4d0..58dce7ce9d4 100644 --- a/pkg/storage/utils/decomposedfs/upload/store_test.go +++ b/pkg/storage/utils/decomposedfs/upload/store_test.go @@ -13,6 +13,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" ) // TestInitNewNode calls greetings.initNewNode @@ -21,8 +22,9 @@ func TestInitNewNode(t *testing.T) { root := t.TempDir() lookup := lookup.New(metadata.NewMessagePackBackend(root, cache.Config{}), &options.Options{Root: root}) + tp := tree.New(lookup, nil, &options.Options{}, nil) - store := NewSessionStore(lookup, nil, root, nil, false, options.TokenOptions{}, false) + store := NewSessionStore(lookup, tp, root, nil, false, options.TokenOptions{}, false) rootNode := node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "", "", 0, "", providerv1beta1.ResourceType_RESOURCE_TYPE_CONTAINER, &userv1beta1.UserId{}, lookup) rootNode.Exists = true @@ -34,7 +36,7 @@ func TestInitNewNode(t *testing.T) { } n := node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "930b7a2e-b745-41e1-8a9b-712582021842", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "newchild", 10, "26493c53-2634-45f8-949f-dc07b88df9b0", providerv1beta1.ResourceType_RESOURCE_TYPE_FILE, &userv1beta1.UserId{}, lookup) n.SpaceRoot = rootNode - unlock, err := store.initNewNode(context.Background(), store.New(context.Background()), n, 10) + unlock, err := store.tp.InitNewNode(context.Background(), n, 10) if err != nil { t.Fatalf(err.Error()) } @@ -45,7 +47,7 @@ func TestInitNewNode(t *testing.T) { // try initializing the same new node again in case a concurrent requests tries to create a file with the same name n = node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "a6ede986-cfcd-41c5-a820-6eee955a1c2b", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "newchild", 10, "26493c53-2634-45f8-949f-dc07b88df9b0", providerv1beta1.ResourceType_RESOURCE_TYPE_FILE, &userv1beta1.UserId{}, lookup) n.SpaceRoot = rootNode - unlock2, err := store.initNewNode(context.Background(), store.New(context.Background()), n, 10) + unlock2, err := store.tp.InitNewNode(context.Background(), n, 10) if _, ok := err.(errtypes.IsAlreadyExists); !ok { t.Fatalf(`initNewNode(with same 'newchild' name), %v, want %v`, err, errtypes.AlreadyExists("newchild")) } diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 1fd88a82719..cb331630827 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -50,27 +50,6 @@ func init() { tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload") } -// Tree is used to manage a tree hierarchy -type Tree interface { - Setup() error - - GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error) - ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error) - // CreateHome(owner *userpb.UserId) (n *node.Node, err error) - CreateDir(ctx context.Context, node *node.Node) (err error) - // CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error - Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) - Delete(ctx context.Context, node *node.Node) (err error) - RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) - PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) - - WriteBlob(node *node.Node, binPath string) error - ReadBlob(node *node.Node) (io.ReadCloser, error) - DeleteBlob(node *node.Node) error - - Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) -} - var defaultFilePerm = os.FileMode(0664) // WriteChunk writes the stream from the reader to the given offset of the upload diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index d200bebe5b2..ef71fb86456 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -342,7 +342,7 @@ var _ = Describe("Async file uploads", Ordered, func() { }) - It("succeeds eventually, creating a new version", func() { + FIt("succeeds eventually, creating a new version", func() { // finish postprocessing con <- events.PostprocessingFinished{ UploadID: uploadID,