Skip to content

Commit

Permalink
Fix uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Mar 18, 2024
1 parent d1a0f7a commit deb3012
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 58 deletions.
28 changes: 28 additions & 0 deletions pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
Expand Down Expand Up @@ -699,6 +700,33 @@ func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string,
return spaceid, entry, nil
}

// InitNewNode initializes a new node
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
}

// create and write lock new node metadata
unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath())
if err != nil {
return nil, err
}

// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return unlock, err
}
h.Close()

if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
return unlock, err
}

return unlock, nil
}

// TODO check if node exists?
func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
ctx, span := tracer.Start(ctx, "createDirNode")
Expand Down
27 changes: 27 additions & 0 deletions pkg/storage/utils/decomposedfs/node/mocks/Tree.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Tree interface {
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error)

InitNewNode(ctx context.Context, n *Node, fsize uint64) (metadata.UnlockFunc, error)

WriteBlob(node *Node, source string) error
ReadBlob(node *Node) (io.ReadCloser, error)
DeleteBlob(node *Node) error
Expand Down
44 changes: 44 additions & 0 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
Expand Down Expand Up @@ -663,6 +664,49 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa
return rn, fn, nil
}

// InitNewNode initializes a new node
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
}

// create and write lock new node metadata
unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath())
if err != nil {
return nil, err
}

// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return unlock, err
}
h.Close()

if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
return unlock, err
}

// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger()
log.Info().Msg("initNewNode: creating symlink")

if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
log.Info().Err(err).Msg("initNewNode: symlink failed")
if errors.Is(err, fs.ErrExist) {
log.Info().Err(err).Msg("initNewNode: symlink already exists")
return unlock, errtypes.AlreadyExists(n.Name)
}
return unlock, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
}
log.Info().Msg("initNewNode: symlink created")

return unlock, nil
}

func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node.Node) error {
logger := appctx.GetLogger(ctx)

Expand Down
40 changes: 7 additions & 33 deletions pkg/storage/utils/decomposedfs/upload/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type PermissionsChecker interface {
// OcisStore manages upload sessions
type OcisStore struct {
lu node.PathLookup
tp Tree
tp node.Tree
root string
pub events.Publisher
async bool
Expand All @@ -63,7 +63,7 @@ type OcisStore struct {
}

// NewSessionStore returns a new OcisStore
func NewSessionStore(lu node.PathLookup, tp Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions, disableVersioning bool) *OcisStore {
func NewSessionStore(lu node.PathLookup, tp node.Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions, disableVersioning bool) *OcisStore {
return &OcisStore{
lu: lu,
tp: tp,
Expand Down Expand Up @@ -219,10 +219,12 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
appctx.GetLogger(ctx).Error().Err(err).Msg("failed to cache id")
}
}
unlock, err = store.initNewNode(ctx, session, n, uint64(session.Size()))

unlock, err = store.tp.InitNewNode(ctx, n, uint64(session.Size()))
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("failed to init new node")
}
session.info.MetaData["sizeDiff"] = strconv.FormatInt(session.Size(), 10)
}
defer func() {
if unlock == nil {
Expand Down Expand Up @@ -267,34 +269,6 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
return n, nil
}

func (store OcisStore) initNewNode(ctx context.Context, session *OcisSession, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
}

// create and write lock new node metadata
unlock, err := store.lu.MetadataBackend().Lock(n.InternalPath())
if err != nil {
return nil, err
}

// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return unlock, err
}
h.Close()

if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
return unlock, err
}

// on a new file the sizeDiff is the fileSize
session.info.MetaData["sizeDiff"] = strconv.FormatInt(int64(fsize), 10)
return unlock, nil
}

func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (metadata.UnlockFunc, error) {
targetPath := n.InternalPath()

Expand Down Expand Up @@ -363,12 +337,12 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
versionPath = session.store.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano))

// create version node
if _, err := os.Create(session.info.MetaData["versionsPath"]); err != nil {
if _, err := os.Create(versionPath); err != nil {
return unlock, err
}

// copy blob metadata to version node
if err := store.lu.CopyMetadataWithSourceLock(ctx, targetPath, session.info.MetaData["versionsPath"], func(attributeName string, value []byte) (newValue []byte, copy bool) {
if err := store.lu.CopyMetadataWithSourceLock(ctx, targetPath, versionPath, func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/utils/decomposedfs/upload/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree"
)

// TestInitNewNode calls greetings.initNewNode
Expand All @@ -21,8 +22,9 @@ func TestInitNewNode(t *testing.T) {
root := t.TempDir()

lookup := lookup.New(metadata.NewMessagePackBackend(root, cache.Config{}), &options.Options{Root: root})
tp := tree.New(lookup, nil, &options.Options{}, nil)

store := NewSessionStore(lookup, nil, root, nil, false, options.TokenOptions{}, false)
store := NewSessionStore(lookup, tp, root, nil, false, options.TokenOptions{}, false)

rootNode := node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "", "", 0, "", providerv1beta1.ResourceType_RESOURCE_TYPE_CONTAINER, &userv1beta1.UserId{}, lookup)
rootNode.Exists = true
Expand All @@ -34,7 +36,7 @@ func TestInitNewNode(t *testing.T) {
}
n := node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "930b7a2e-b745-41e1-8a9b-712582021842", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "newchild", 10, "26493c53-2634-45f8-949f-dc07b88df9b0", providerv1beta1.ResourceType_RESOURCE_TYPE_FILE, &userv1beta1.UserId{}, lookup)
n.SpaceRoot = rootNode
unlock, err := store.initNewNode(context.Background(), store.New(context.Background()), n, 10)
unlock, err := store.tp.InitNewNode(context.Background(), n, 10)
if err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -45,7 +47,7 @@ func TestInitNewNode(t *testing.T) {
// try initializing the same new node again in case a concurrent requests tries to create a file with the same name
n = node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "a6ede986-cfcd-41c5-a820-6eee955a1c2b", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "newchild", 10, "26493c53-2634-45f8-949f-dc07b88df9b0", providerv1beta1.ResourceType_RESOURCE_TYPE_FILE, &userv1beta1.UserId{}, lookup)
n.SpaceRoot = rootNode
unlock2, err := store.initNewNode(context.Background(), store.New(context.Background()), n, 10)
unlock2, err := store.tp.InitNewNode(context.Background(), n, 10)
if _, ok := err.(errtypes.IsAlreadyExists); !ok {
t.Fatalf(`initNewNode(with same 'newchild' name), %v, want %v`, err, errtypes.AlreadyExists("newchild"))
}
Expand Down
21 changes: 0 additions & 21 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,6 @@ func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload")
}

// Tree is used to manage a tree hierarchy
type Tree interface {
Setup() error

GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error)
ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error)
// CreateHome(owner *userpb.UserId) (n *node.Node, err error)
CreateDir(ctx context.Context, node *node.Node) (err error)
// CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error
Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error)
Delete(ctx context.Context, node *node.Node) (err error)
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error)

WriteBlob(node *node.Node, binPath string) error
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error

Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
}

var defaultFilePerm = os.FileMode(0664)

// WriteChunk writes the stream from the reader to the given offset of the upload
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ var _ = Describe("Async file uploads", Ordered, func() {

})

It("succeeds eventually, creating a new version", func() {
FIt("succeeds eventually, creating a new version", func() {
// finish postprocessing
con <- events.PostprocessingFinished{
UploadID: uploadID,
Expand Down

0 comments on commit deb3012

Please sign in to comment.