Skip to content

Commit

Permalink
Decomposedfs propagate diff (#3482)
Browse files Browse the repository at this point in the history
* rewrite finish upload to get atomic size diff

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* decomposedfs: make finish upload atomic

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* add lock functions

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* allow locking non existing files, fix locking with existing lock

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* make returned error recognizable

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* more lock fixes

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* do not log nil error

Co-authored-by: Andre Duffeck <aduffeck@users.noreply.github.com>

* don't overwrite original error when deleting the blob fails

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* always release node lock

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* keep correct mtimes

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix adler checksum

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* stat before closing

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix lint ... and proper revision download is not covered by the CS3 api

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix permissions when downloading grants

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* update changelog

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix locks and revision restore

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* assemble permissions on the node when checking a revision

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix typos

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* allow revision download when user has initiate download and list revision permission

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix reading revision node

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* do not forget revision delimiter

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* drop old revision

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* remove unexpected failures

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* update changelog and unexpected passes

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* propagate size diff

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix move, add changelog

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix size diff calculation for now files

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* incorporate reviews

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* update tree mock

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix reva tests

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* update tree unit tests

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* clarify revisions code

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* silence valid return codes

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* read size before moving node to trash

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* fix s3ng revision download

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* clean up revision on error

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* use verbose newNode name

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* make FlockFile public and more readable

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* introduce FileLockSuffix

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* use filelock package instead of hardcoded strings

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

* make linter happy

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
Co-authored-by: Andre Duffeck <aduffeck@users.noreply.github.com>
  • Loading branch information
butonic and aduffeck authored Nov 23, 2022
1 parent 21ade71 commit cf8ce44
Show file tree
Hide file tree
Showing 21 changed files with 589 additions and 250 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/decomposedfs-finish-upload-rewrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Bugfix: decomposedfs fix revision download

We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions.

https://github.com/cs3org/reva/pull/3473
https://github.com/owncloud/ocis/issues/765
https://github.com/owncloud/ocis/issues/3868
6 changes: 6 additions & 0 deletions changelog/unreleased/decomposedfs-propagate-sizediff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bugfix: decomposedfs propagate sizediff

We now propagate the size diff instead of calculating the treesize. This fixes the slower upload speeds in large folders.

https://github.com/cs3org/reva/pull/3482
https://github.com/owncloud/ocis/issues/5061
10 changes: 10 additions & 0 deletions internal/http/services/owncloud/ocdav/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"path"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (h *MetaHandler) Handler(s *svc) http.Handler {
errors.HandleWebdavError(logger, w, b, err)
return
}
if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 {
logger := appctx.GetLogger(r.Context())
logger.Warn().Str("id", id).Msg("detected invalid : separated resourceid id, trying to split it ... but fix the client that made the request")
// try splitting with :
parts := strings.SplitN(id, ":", 3)
did.StorageId = parts[0]
did.SpaceId = parts[1]
did.OpaqueId = parts[2]
}

var head string
head, r.URL.Path = router.ShiftPath(r.URL.Path)
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"path"
"path/filepath"
"strconv"
"strings"
"syscall"

cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1"
Expand Down Expand Up @@ -87,7 +88,7 @@ type Tree interface {
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error

Propagate(ctx context.Context, node *node.Node) (err error)
Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
}

// Decomposedfs provides the base for decomposed filesystem implementations
Expand Down Expand Up @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er

// Download returns a reader to the specified resource
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
// check if we are trying to download a revision
// TODO the CS3 api should allow initiating a revision download
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId)
}

node, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
}
}

return fs.tp.Propagate(ctx, node)
return fs.tp.Propagate(ctx, node, 0)
}

// UpdateGrant updates a grant on a resource
Expand Down Expand Up @@ -321,7 +321,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
return err
}

return fs.tp.Propagate(ctx, n)
return fs.tp.Propagate(ctx, n, 0)
}

// extractACEsFromAttrs reads ACEs in the list of attrs from the node
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
if md.Metadata != nil {
if val, ok := md.Metadata["mtime"]; ok {
delete(md.Metadata, "mtime")
err := n.SetMtime(ctx, val)
if err != nil {
if err := n.SetMtimeString(val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set mtime"))
}
}
Expand All @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
// TODO unset when folder is updated or add timestamp to etag?
if val, ok := md.Metadata["etag"]; ok {
delete(md.Metadata, "etag")
err := n.SetEtag(ctx, val)
if err != nil {
if err := n.SetEtag(ctx, val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set etag"))
}
}
Expand Down Expand Up @@ -119,7 +117,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.

switch len(errs) {
case 0:
return fs.tp.Propagate(ctx, n)
return fs.tp.Propagate(ctx, n, 0)
case 1:
// TODO Propagate if anything changed
return errs[0]
Expand Down Expand Up @@ -209,7 +207,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide
}
switch len(errs) {
case 0:
return fs.tp.Propagate(ctx, n)
return fs.tp.Propagate(ctx, n, 0)
case 1:
// TODO Propagate if anything changed
return errs[0]
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/mocks/Tree.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 42 additions & 20 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
return r, nil
}

// are we reading a revision?
revisionSuffix := ""
if strings.Contains(nodeID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2)
if len(kp) == 2 {
// use the actual node for the metadata lookup
nodeID = kp[0]
// remember revision for blob metadata
revisionSuffix = RevisionIDDelimiter + kp[1]
}
}

// read node
n = &Node{
SpaceID: spaceID,
Expand All @@ -223,6 +236,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
SpaceRoot: r,
}

// append back revision to nodeid, even when returning a not existing node
defer func() {
n.ID += revisionSuffix
}()

nodePath := n.InternalPath()

// lookup name in extended attributes
Expand All @@ -237,7 +255,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
n.Exists = true

// lookup blobID in extended attributes
n.BlobID, err = n.Xattr(xattrs.BlobIDAttr)
n.BlobID, err = ReadBlobIDAttr(nodePath + revisionSuffix)
switch {
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
Expand All @@ -246,7 +264,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
}

// Lookup blobsize
n.Blobsize, err = ReadBlobSizeAttr(nodePath)
n.Blobsize, err = ReadBlobSizeAttr(nodePath + revisionSuffix)
switch {
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
Expand Down Expand Up @@ -495,25 +513,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) {
return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(ctx context.Context, mtime string) error {
sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger()
if mt, err := parseMTime(mtime); err == nil {
nodePath := n.InternalPath()
// updating mtime also updates atime
if err := os.Chtimes(nodePath, mt, mt); err != nil {
sublog.Error().Err(err).
Time("mtime", mt).
Msg("could not set mtime")
return errors.Wrap(err, "could not set mtime")
}
} else {
sublog.Error().Err(err).
Str("mtime", mtime).
Msg("could not parse mtime")
return errors.Wrap(err, "could not parse mtime")
// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string
func (n *Node) SetMtimeString(mtime string) error {
mt, err := parseMTime(mtime)
if err != nil {
return err
}
return nil
return n.SetMtime(mt)
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(mtime time.Time) error {
nodePath := n.InternalPath()
// updating mtime also updates atime
return os.Chtimes(nodePath, mtime, mtime)
}

// SetEtag sets the temporary etag of a node if it differs from the current etag
Expand Down Expand Up @@ -929,6 +942,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) {
return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10))
}

// GetBlobSize reads the blobsize from the extended attributes
func (n *Node) GetBlobSize() (treesize uint64, err error) {
var b string
if b, err = n.Xattr(xattrs.BlobsizeAttr); err != nil {
return
}
return strconv.ParseUint(b, 10, 64)
}

// SetChecksum writes the checksum with the given checksum type to the extended attributes
func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) {
return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil)))
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ var _ = Describe("Node", func() {
n, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())

blobsize := 239485734
blobsize := int64(239485734)
n.Name = "TestName"
n.BlobID = "TestBlobID"
n.Blobsize = int64(blobsize)
n.Blobsize = blobsize

err = n.WriteAllNodeMetadata()
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expect(n2.Name).To(Equal("TestName"))
Expect(n2.BlobID).To(Equal("TestBlobID"))
Expect(n2.Blobsize).To(Equal(int64(blobsize)))
Expect(n2.Blobsize).To(Equal(blobsize))
})
})

Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/utils/decomposedfs/node/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package node

import (
"context"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -91,6 +93,17 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov
return NoPermissions(), nil
}

// are we reading a revision?
if strings.Contains(n.ID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2)
if len(kp) != 2 {
return NoPermissions(), errtypes.NotFound(n.ID)
}
// use the actual node for the permission assembly
n.ID = kp[0]
}

// check if the current user is the owner
if utils.UserIDEqual(u.Id, n.Owner()) {
return OwnerPermissions(), nil
Expand Down
25 changes: 24 additions & 1 deletion pkg/storage/utils/decomposedfs/node/xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package node

import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/gofrs/flock"
"github.com/pkg/xattr"
)

Expand All @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) {
return xattrs.SetMultiple(n.InternalPath(), attribs)
}

// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock
func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) {
// TODO what if writing the lock fails?
if n.xattrsCache != nil {
for k, v := range attribs {
n.xattrsCache[k] = v
}
}

return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock)
}

// SetXattr sets an extended attribute on the write-through cache/node
func (n *Node) SetXattr(key, val string) (err error) {
if n.xattrsCache != nil {
Expand All @@ -43,6 +56,15 @@ func (n *Node) SetXattr(key, val string) (err error) {
return xattrs.Set(n.InternalPath(), key, val)
}

// SetXattrWithLock sets an extended attribute on the write-through cache/node with the given lock
func (n *Node) SetXattrWithLock(key, val string, fileLock *flock.Flock) (err error) {
if n.xattrsCache != nil {
n.xattrsCache[key] = val
}

return xattrs.SetWithLock(n.InternalPath(), key, val, fileLock)
}

// RemoveXattr removes an extended attribute from the write-through cache/node
func (n *Node) RemoveXattr(key string) error {
if n.xattrsCache != nil {
Expand Down Expand Up @@ -80,5 +102,6 @@ func (n *Node) Xattr(key string) (string, error) {
if val, ok := n.xattrsCache[key]; ok {
return val, nil
}
return "", xattr.ENOATTR
// wrap the error as xattr does
return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR}
}
Loading

0 comments on commit cf8ce44

Please sign in to comment.