Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decomposedfs propagate diff #3482

Merged
merged 41 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fced7bd
rewrite finish upload to get atomic size diff
butonic Nov 17, 2022
c58f166
decomposedfs: make finish upload atomic
butonic Nov 17, 2022
150fe7d
add lock functions
butonic Nov 17, 2022
4dae4b6
allow locking non existing files, fix locking with existing lock
butonic Nov 17, 2022
3d0977a
make returned error recognizable
butonic Nov 17, 2022
11fc7c4
more lock fixes
butonic Nov 17, 2022
47f29a1
do not log nil error
butonic Nov 18, 2022
bf30d98
don't overwrite original error when deleting the blob fails
butonic Nov 18, 2022
786d0b3
always release node lock
butonic Nov 18, 2022
b955128
keep correct mtimes
butonic Nov 18, 2022
3db9243
fix adler checksum
butonic Nov 18, 2022
38c1d93
stat before closing
butonic Nov 18, 2022
cb990cd
fix lint ... and proper revision download is not covered by the CS3 api
butonic Nov 18, 2022
57aaca2
fix permissions when downloading grants
butonic Nov 18, 2022
82419a3
update changelog
butonic Nov 18, 2022
0686a7c
fix locks and revision restore
butonic Nov 18, 2022
b162894
assemble permissions on the node when checking a revision
butonic Nov 18, 2022
d9534ef
fix typos
butonic Nov 18, 2022
36b8a84
allow revision download when user has initiate download and list revi…
butonic Nov 18, 2022
c90d8c4
fix reading revision node
butonic Nov 21, 2022
8679103
do not forget revision delimiter
butonic Nov 21, 2022
f2d29bb
drop old revision
butonic Nov 21, 2022
2401c1f
remove unexpected failures
butonic Nov 21, 2022
19ee0cc
update changelog and unexpected passes
butonic Nov 21, 2022
18e8cf8
propagate size diff
butonic Nov 21, 2022
77b889d
fix move, add changelog
butonic Nov 22, 2022
4549f32
fix size diff calculation for now files
butonic Nov 22, 2022
a58258e
incorporate reviews
butonic Nov 22, 2022
cb2e355
update tree mock
butonic Nov 22, 2022
dc6cf0a
fix reva tests
butonic Nov 22, 2022
ad36838
update tree unit tests
butonic Nov 22, 2022
c9e5aa1
clarify revisions code
butonic Nov 22, 2022
8439c60
silence valid return codes
butonic Nov 22, 2022
8cb9c94
read size before moving node to trash
butonic Nov 22, 2022
903af6c
fix s3ng revision download
butonic Nov 22, 2022
75cdd22
clean up revision on error
butonic Nov 23, 2022
ef0382f
use verbose newNode name
butonic Nov 23, 2022
ca9a993
make FlockFile public and more readable
butonic Nov 23, 2022
5b37813
introduce FileLockSuffix
butonic Nov 23, 2022
9f17301
use filelock package instead of hardcoded strings
butonic Nov 23, 2022
e8575a9
make linter happy
butonic Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/unreleased/decomposedfs-finish-upload-rewrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Bugfix: decomposedfs fix revision download

We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions.

https://github.com/cs3org/reva/pull/3473
https://github.com/owncloud/ocis/issues/765
https://github.com/owncloud/ocis/issues/3868
6 changes: 6 additions & 0 deletions changelog/unreleased/decomposedfs-propagate-sizediff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bugfix: decomposedfs propagate sizediff

We now propagate the size diff instead of calculating the treesize. This fixes the slower upload speeds in large folders.

https://github.com/cs3org/reva/pull/3482
https://github.com/owncloud/ocis/issues/5061
10 changes: 10 additions & 0 deletions internal/http/services/owncloud/ocdav/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"path"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (h *MetaHandler) Handler(s *svc) http.Handler {
errors.HandleWebdavError(logger, w, b, err)
return
}
if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 {
logger := appctx.GetLogger(r.Context())
logger.Warn().Str("id", id).Msg("detected invalid : separated resourceid id, trying to split it ... but fix the client that made the request")
// try splitting with :
parts := strings.SplitN(id, ":", 3)
did.StorageId = parts[0]
did.SpaceId = parts[1]
did.OpaqueId = parts[2]
}

var head string
head, r.URL.Path = router.ShiftPath(r.URL.Path)
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"path"
"path/filepath"
"strconv"
"strings"
"syscall"

cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1"
Expand Down Expand Up @@ -87,7 +88,7 @@ type Tree interface {
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error

Propagate(ctx context.Context, node *node.Node) (err error)
Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
}

// Decomposedfs provides the base for decomposed filesystem implementations
Expand Down Expand Up @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er

// Download returns a reader to the specified resource
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
// check if we are trying to download a revision
// TODO the CS3 api should allow initiating a revision download
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId)
}

node, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
}
}

return fs.tp.Propagate(ctx, node)
return fs.tp.Propagate(ctx, node, 0)
}

// UpdateGrant updates a grant on a resource
Expand Down Expand Up @@ -321,7 +321,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
return err
}

return fs.tp.Propagate(ctx, n)
return fs.tp.Propagate(ctx, n, 0)
}

// extractACEsFromAttrs reads ACEs in the list of attrs from the node
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
if md.Metadata != nil {
if val, ok := md.Metadata["mtime"]; ok {
delete(md.Metadata, "mtime")
err := n.SetMtime(ctx, val)
if err != nil {
if err := n.SetMtimeString(val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set mtime"))
}
}
Expand All @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
// TODO unset when folder is updated or add timestamp to etag?
if val, ok := md.Metadata["etag"]; ok {
delete(md.Metadata, "etag")
err := n.SetEtag(ctx, val)
if err != nil {
if err := n.SetEtag(ctx, val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set etag"))
}
}
Expand Down Expand Up @@ -119,7 +117,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.

switch len(errs) {
case 0:
return fs.tp.Propagate(ctx, n)
return fs.tp.Propagate(ctx, n, 0)
case 1:
// TODO Propagate if anything changed
return errs[0]
Expand Down Expand Up @@ -209,7 +207,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide
}
switch len(errs) {
case 0:
return fs.tp.Propagate(ctx, n)
return fs.tp.Propagate(ctx, n, 0)
case 1:
// TODO Propagate if anything changed
return errs[0]
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/mocks/Tree.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 42 additions & 20 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
return r, nil
}

// are we reading a revision?
revisionSuffix := ""
if strings.Contains(nodeID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2)
if len(kp) == 2 {
// use the actual node for the metadata lookup
nodeID = kp[0]
// remember revision for blob metadata
revisionSuffix = RevisionIDDelimiter + kp[1]
}
}

// read node
n = &Node{
SpaceID: spaceID,
Expand All @@ -223,6 +236,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
SpaceRoot: r,
}

// append back revision to nodeid, even when returning a not existing node
defer func() {
n.ID += revisionSuffix
}()

nodePath := n.InternalPath()

// lookup name in extended attributes
Expand All @@ -237,7 +255,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
n.Exists = true

// lookup blobID in extended attributes
n.BlobID, err = n.Xattr(xattrs.BlobIDAttr)
n.BlobID, err = ReadBlobIDAttr(nodePath + revisionSuffix)
switch {
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
Expand All @@ -246,7 +264,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
}

// Lookup blobsize
n.Blobsize, err = ReadBlobSizeAttr(nodePath)
n.Blobsize, err = ReadBlobSizeAttr(nodePath + revisionSuffix)
switch {
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
Expand Down Expand Up @@ -495,25 +513,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) {
return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(ctx context.Context, mtime string) error {
sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger()
if mt, err := parseMTime(mtime); err == nil {
nodePath := n.InternalPath()
// updating mtime also updates atime
if err := os.Chtimes(nodePath, mt, mt); err != nil {
sublog.Error().Err(err).
Time("mtime", mt).
Msg("could not set mtime")
return errors.Wrap(err, "could not set mtime")
}
} else {
sublog.Error().Err(err).
Str("mtime", mtime).
Msg("could not parse mtime")
return errors.Wrap(err, "could not parse mtime")
// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string
func (n *Node) SetMtimeString(mtime string) error {
mt, err := parseMTime(mtime)
if err != nil {
return err
}
return nil
return n.SetMtime(mt)
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(mtime time.Time) error {
nodePath := n.InternalPath()
// updating mtime also updates atime
return os.Chtimes(nodePath, mtime, mtime)
}

// SetEtag sets the temporary etag of a node if it differs from the current etag
Expand Down Expand Up @@ -929,6 +942,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) {
return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10))
}

// GetBlobSize reads the blobsize from the extended attributes
func (n *Node) GetBlobSize() (treesize uint64, err error) {
var b string
if b, err = n.Xattr(xattrs.BlobsizeAttr); err != nil {
return
}
return strconv.ParseUint(b, 10, 64)
}

// SetChecksum writes the checksum with the given checksum type to the extended attributes
func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) {
return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil)))
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ var _ = Describe("Node", func() {
n, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())

blobsize := 239485734
blobsize := int64(239485734)
n.Name = "TestName"
n.BlobID = "TestBlobID"
n.Blobsize = int64(blobsize)
n.Blobsize = blobsize

err = n.WriteAllNodeMetadata()
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expect(n2.Name).To(Equal("TestName"))
Expect(n2.BlobID).To(Equal("TestBlobID"))
Expect(n2.Blobsize).To(Equal(int64(blobsize)))
Expect(n2.Blobsize).To(Equal(blobsize))
})
})

Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/utils/decomposedfs/node/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package node

import (
"context"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -91,6 +93,17 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov
return NoPermissions(), nil
}

// are we reading a revision?
if strings.Contains(n.ID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2)
if len(kp) != 2 {
return NoPermissions(), errtypes.NotFound(n.ID)
}
// use the actual node for the permission assembly
n.ID = kp[0]
}

// check if the current user is the owner
if utils.UserIDEqual(u.Id, n.Owner()) {
return OwnerPermissions(), nil
Expand Down
25 changes: 24 additions & 1 deletion pkg/storage/utils/decomposedfs/node/xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package node

import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/gofrs/flock"
"github.com/pkg/xattr"
)

Expand All @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) {
return xattrs.SetMultiple(n.InternalPath(), attribs)
}

// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock
func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) {
// TODO what if writing the lock fails?
if n.xattrsCache != nil {
for k, v := range attribs {
n.xattrsCache[k] = v
}
}

return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock)
}

// SetXattr sets an extended attribute on the write-through cache/node
func (n *Node) SetXattr(key, val string) (err error) {
if n.xattrsCache != nil {
Expand All @@ -43,6 +56,15 @@ func (n *Node) SetXattr(key, val string) (err error) {
return xattrs.Set(n.InternalPath(), key, val)
}

// SetXattrWithLock sets an extended attribute on the write-through cache/node with the given lock
func (n *Node) SetXattrWithLock(key, val string, fileLock *flock.Flock) (err error) {
if n.xattrsCache != nil {
n.xattrsCache[key] = val
}

return xattrs.SetWithLock(n.InternalPath(), key, val, fileLock)
}

// RemoveXattr removes an extended attribute from the write-through cache/node
func (n *Node) RemoveXattr(key string) error {
if n.xattrsCache != nil {
Expand Down Expand Up @@ -80,5 +102,6 @@ func (n *Node) Xattr(key string) (string, error) {
if val, ok := n.xattrsCache[key]; ok {
return val, nil
}
return "", xattr.ENOATTR
// wrap the error as xattr does
return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR}
}
Loading