From cf8ce449fc83d12de2cce78d03a03f38a75022a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 23 Nov 2022 12:03:42 +0100 Subject: [PATCH] Decomposedfs propagate diff (#3482) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * rewrite finish upload to get atomic size diff Signed-off-by: Jörn Friedrich Dreyer * decomposedfs: make finish upload atomic Signed-off-by: Jörn Friedrich Dreyer * add lock functions Signed-off-by: Jörn Friedrich Dreyer * allow locking non existing files, fix locking with existing lock Signed-off-by: Jörn Friedrich Dreyer * make returned error recognizable Signed-off-by: Jörn Friedrich Dreyer * more lock fixes Signed-off-by: Jörn Friedrich Dreyer * do not log nil error Co-authored-by: Andre Duffeck * don't overwrite original error when deleting the blob fails Signed-off-by: Jörn Friedrich Dreyer * always release node lock Signed-off-by: Jörn Friedrich Dreyer * keep correct mtimes Signed-off-by: Jörn Friedrich Dreyer * fix adler checksum Signed-off-by: Jörn Friedrich Dreyer * stat before closing Signed-off-by: Jörn Friedrich Dreyer * fix lint ... and proper revision download is not covered by the CS3 api Signed-off-by: Jörn Friedrich Dreyer * fix permissions when downloading grants Signed-off-by: Jörn Friedrich Dreyer * update changelog Signed-off-by: Jörn Friedrich Dreyer * fix locks and revision restore Signed-off-by: Jörn Friedrich Dreyer * assemble permissions on the node when checking a revision Signed-off-by: Jörn Friedrich Dreyer * fix typos Signed-off-by: Jörn Friedrich Dreyer * allow revision download when user has initiate download and list revision permission Signed-off-by: Jörn Friedrich Dreyer * fix reading revision node Signed-off-by: Jörn Friedrich Dreyer * do not forget revision delimiter Signed-off-by: Jörn Friedrich Dreyer * drop old revision Signed-off-by: Jörn Friedrich Dreyer * remove unexpected failures Signed-off-by: Jörn Friedrich Dreyer * update changelog and unexpected passes Signed-off-by: Jörn Friedrich Dreyer * propagate size diff Signed-off-by: Jörn Friedrich Dreyer * fix move, add changelog Signed-off-by: Jörn Friedrich Dreyer * fix size diff calculation for now files Signed-off-by: Jörn Friedrich Dreyer * incorporate reviews Signed-off-by: Jörn Friedrich Dreyer * update tree mock Signed-off-by: Jörn Friedrich Dreyer * fix reva tests Signed-off-by: Jörn Friedrich Dreyer * update tree unit tests Signed-off-by: Jörn Friedrich Dreyer * clarify revisions code Signed-off-by: Jörn Friedrich Dreyer * silence valid return codes Signed-off-by: Jörn Friedrich Dreyer * read size before moving node to trash Signed-off-by: Jörn Friedrich Dreyer * fix s3ng revision download Signed-off-by: Jörn Friedrich Dreyer * clean up revision on error Signed-off-by: Jörn Friedrich Dreyer * use verbose newNode name Signed-off-by: Jörn Friedrich Dreyer * make FlockFile public and more readable Signed-off-by: Jörn Friedrich Dreyer * introduce FileLockSuffix Signed-off-by: Jörn Friedrich Dreyer * use filelock package instead of hardcoded strings Signed-off-by: Jörn Friedrich Dreyer * make linter happy Signed-off-by: Jörn Friedrich Dreyer Signed-off-by: Jörn Friedrich Dreyer Co-authored-by: Andre Duffeck --- .../decomposedfs-finish-upload-rewrite.md | 7 + .../decomposedfs-propagate-sizediff.md | 6 + internal/http/services/owncloud/ocdav/meta.go | 10 + .../utils/decomposedfs/decomposedfs.go | 9 +- pkg/storage/utils/decomposedfs/grants.go | 4 +- pkg/storage/utils/decomposedfs/metadata.go | 10 +- pkg/storage/utils/decomposedfs/mocks/Tree.go | 10 +- pkg/storage/utils/decomposedfs/node/node.go | 62 ++-- .../utils/decomposedfs/node/node_test.go | 6 +- .../utils/decomposedfs/node/permissions.go | 13 + pkg/storage/utils/decomposedfs/node/xattrs.go | 25 +- pkg/storage/utils/decomposedfs/revisions.go | 105 +++++-- pkg/storage/utils/decomposedfs/spaces.go | 12 +- .../utils/decomposedfs/testhelpers/helpers.go | 6 +- pkg/storage/utils/decomposedfs/tree/tree.go | 135 +++++--- .../utils/decomposedfs/tree/tree_test.go | 22 +- pkg/storage/utils/decomposedfs/upload.go | 297 ++++++++++++------ .../utils/decomposedfs/xattrs/xattrs.go | 67 +++- pkg/storage/utils/filelocks/filelocks.go | 20 +- .../expected-failures-on-OCIS-storage.md | 7 - .../expected-failures-on-S3NG-storage.md | 6 - 21 files changed, 589 insertions(+), 250 deletions(-) create mode 100644 changelog/unreleased/decomposedfs-finish-upload-rewrite.md create mode 100644 changelog/unreleased/decomposedfs-propagate-sizediff.md diff --git a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md new file mode 100644 index 0000000000..834958b94a --- /dev/null +++ b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md @@ -0,0 +1,7 @@ +Bugfix: decomposedfs fix revision download + +We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions. + +https://github.com/cs3org/reva/pull/3473 +https://github.com/owncloud/ocis/issues/765 +https://github.com/owncloud/ocis/issues/3868 \ No newline at end of file diff --git a/changelog/unreleased/decomposedfs-propagate-sizediff.md b/changelog/unreleased/decomposedfs-propagate-sizediff.md new file mode 100644 index 0000000000..e0068eef67 --- /dev/null +++ b/changelog/unreleased/decomposedfs-propagate-sizediff.md @@ -0,0 +1,6 @@ +Bugfix: decomposedfs propagate sizediff + +We now propagate the size diff instead of calculating the treesize. This fixes the slower upload speeds in large folders. + +https://github.com/cs3org/reva/pull/3482 +https://github.com/owncloud/ocis/issues/5061 \ No newline at end of file diff --git a/internal/http/services/owncloud/ocdav/meta.go b/internal/http/services/owncloud/ocdav/meta.go index a345ffbd8b..69d7da344d 100644 --- a/internal/http/services/owncloud/ocdav/meta.go +++ b/internal/http/services/owncloud/ocdav/meta.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" "path" + "strings" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -70,6 +71,15 @@ func (h *MetaHandler) Handler(s *svc) http.Handler { errors.HandleWebdavError(logger, w, b, err) return } + if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 { + logger := appctx.GetLogger(r.Context()) + logger.Warn().Str("id", id).Msg("detected invalid : separated resourceid id, trying to split it ... but fix the client that made the request") + // try splitting with : + parts := strings.SplitN(id, ":", 3) + did.StorageId = parts[0] + did.SpaceId = parts[1] + did.OpaqueId = parts[2] + } var head string head, r.URL.Path = router.ShiftPath(r.URL.Path) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 776ceb5d49..ccfb276176 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -30,6 +30,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "syscall" cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1" @@ -87,7 +88,7 @@ type Tree interface { ReadBlob(node *node.Node) (io.ReadCloser, error) DeleteBlob(node *node.Node) error - Propagate(ctx context.Context, node *node.Node) (err error) + Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) } // Decomposedfs provides the base for decomposed filesystem implementations @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er // Download returns a reader to the specified resource func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) { + // check if we are trying to download a revision + // TODO the CS3 api should allow initiating a revision download + if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) { + return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId) + } + node, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error resolving ref") diff --git a/pkg/storage/utils/decomposedfs/grants.go b/pkg/storage/utils/decomposedfs/grants.go index e3edb30a9e..5fce464203 100644 --- a/pkg/storage/utils/decomposedfs/grants.go +++ b/pkg/storage/utils/decomposedfs/grants.go @@ -220,7 +220,7 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference } } - return fs.tp.Propagate(ctx, node) + return fs.tp.Propagate(ctx, node, 0) } // UpdateGrant updates a grant on a resource @@ -321,7 +321,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide return err } - return fs.tp.Propagate(ctx, n) + return fs.tp.Propagate(ctx, n, 0) } // extractACEsFromAttrs reads ACEs in the list of attrs from the node diff --git a/pkg/storage/utils/decomposedfs/metadata.go b/pkg/storage/utils/decomposedfs/metadata.go index ed9122796c..2ed0cb4ee9 100644 --- a/pkg/storage/utils/decomposedfs/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata.go @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. if md.Metadata != nil { if val, ok := md.Metadata["mtime"]; ok { delete(md.Metadata, "mtime") - err := n.SetMtime(ctx, val) - if err != nil { + if err := n.SetMtimeString(val); err != nil { errs = append(errs, errors.Wrap(err, "could not set mtime")) } } @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. // TODO unset when folder is updated or add timestamp to etag? if val, ok := md.Metadata["etag"]; ok { delete(md.Metadata, "etag") - err := n.SetEtag(ctx, val) - if err != nil { + if err := n.SetEtag(ctx, val); err != nil { errs = append(errs, errors.Wrap(err, "could not set etag")) } } @@ -119,7 +117,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. switch len(errs) { case 0: - return fs.tp.Propagate(ctx, n) + return fs.tp.Propagate(ctx, n, 0) case 1: // TODO Propagate if anything changed return errs[0] @@ -209,7 +207,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide } switch len(errs) { case 0: - return fs.tp.Propagate(ctx, n) + return fs.tp.Propagate(ctx, n, 0) case 1: // TODO Propagate if anything changed return errs[0] diff --git a/pkg/storage/utils/decomposedfs/mocks/Tree.go b/pkg/storage/utils/decomposedfs/mocks/Tree.go index f47e3c3973..f54c26cecf 100644 --- a/pkg/storage/utils/decomposedfs/mocks/Tree.go +++ b/pkg/storage/utils/decomposedfs/mocks/Tree.go @@ -139,13 +139,13 @@ func (_m *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node return r0 } -// Propagate provides a mock function with given fields: ctx, _a1 -func (_m *Tree) Propagate(ctx context.Context, _a1 *node.Node) error { - ret := _m.Called(ctx, _a1) +// Propagate provides a mock function with given fields: ctx, _a1, sizeDiff +func (_m *Tree) Propagate(ctx context.Context, _a1 *node.Node, sizeDiff int64) error { + ret := _m.Called(ctx, _a1, sizeDiff) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *node.Node) error); ok { - r0 = rf(ctx, _a1) + if rf, ok := ret.Get(0).(func(context.Context, *node.Node, int64) error); ok { + r0 = rf(ctx, _a1, sizeDiff) } else { r0 = ret.Error(0) } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 04acdf444f..dd0f571003 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -215,6 +215,19 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis return r, nil } + // are we reading a revision? + revisionSuffix := "" + if strings.Contains(nodeID, RevisionIDDelimiter) { + // verify revision key format + kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2) + if len(kp) == 2 { + // use the actual node for the metadata lookup + nodeID = kp[0] + // remember revision for blob metadata + revisionSuffix = RevisionIDDelimiter + kp[1] + } + } + // read node n = &Node{ SpaceID: spaceID, @@ -223,6 +236,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis SpaceRoot: r, } + // append back revision to nodeid, even when returning a not existing node + defer func() { + n.ID += revisionSuffix + }() + nodePath := n.InternalPath() // lookup name in extended attributes @@ -237,7 +255,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis n.Exists = true // lookup blobID in extended attributes - n.BlobID, err = n.Xattr(xattrs.BlobIDAttr) + n.BlobID, err = ReadBlobIDAttr(nodePath + revisionSuffix) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -246,7 +264,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } // Lookup blobsize - n.Blobsize, err = ReadBlobSizeAttr(nodePath) + n.Blobsize, err = ReadBlobSizeAttr(nodePath + revisionSuffix) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -495,25 +513,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) { return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil } -// SetMtime sets the mtime and atime of a node -func (n *Node) SetMtime(ctx context.Context, mtime string) error { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() - if mt, err := parseMTime(mtime); err == nil { - nodePath := n.InternalPath() - // updating mtime also updates atime - if err := os.Chtimes(nodePath, mt, mt); err != nil { - sublog.Error().Err(err). - Time("mtime", mt). - Msg("could not set mtime") - return errors.Wrap(err, "could not set mtime") - } - } else { - sublog.Error().Err(err). - Str("mtime", mtime). - Msg("could not parse mtime") - return errors.Wrap(err, "could not parse mtime") +// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string +func (n *Node) SetMtimeString(mtime string) error { + mt, err := parseMTime(mtime) + if err != nil { + return err } - return nil + return n.SetMtime(mt) +} + +// SetMtime sets the mtime and atime of a node +func (n *Node) SetMtime(mtime time.Time) error { + nodePath := n.InternalPath() + // updating mtime also updates atime + return os.Chtimes(nodePath, mtime, mtime) } // SetEtag sets the temporary etag of a node if it differs from the current etag @@ -929,6 +942,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) { return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10)) } +// GetBlobSize reads the blobsize from the extended attributes +func (n *Node) GetBlobSize() (treesize uint64, err error) { + var b string + if b, err = n.Xattr(xattrs.BlobsizeAttr); err != nil { + return + } + return strconv.ParseUint(b, 10, 64) +} + // SetChecksum writes the checksum with the given checksum type to the extended attributes func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) { return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil))) diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index a8c192936f..c92ce5b785 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -89,10 +89,10 @@ var _ = Describe("Node", func() { n, err := env.Lookup.NodeFromResource(env.Ctx, ref) Expect(err).ToNot(HaveOccurred()) - blobsize := 239485734 + blobsize := int64(239485734) n.Name = "TestName" n.BlobID = "TestBlobID" - n.Blobsize = int64(blobsize) + n.Blobsize = blobsize err = n.WriteAllNodeMetadata() Expect(err).ToNot(HaveOccurred()) @@ -100,7 +100,7 @@ var _ = Describe("Node", func() { Expect(err).ToNot(HaveOccurred()) Expect(n2.Name).To(Equal("TestName")) Expect(n2.BlobID).To(Equal("TestBlobID")) - Expect(n2.Blobsize).To(Equal(int64(blobsize))) + Expect(n2.Blobsize).To(Equal(blobsize)) }) }) diff --git a/pkg/storage/utils/decomposedfs/node/permissions.go b/pkg/storage/utils/decomposedfs/node/permissions.go index 117b9a72f3..9a076f7f65 100644 --- a/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/pkg/storage/utils/decomposedfs/node/permissions.go @@ -20,10 +20,12 @@ package node import ( "context" + "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -91,6 +93,17 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov return NoPermissions(), nil } + // are we reading a revision? + if strings.Contains(n.ID, RevisionIDDelimiter) { + // verify revision key format + kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2) + if len(kp) != 2 { + return NoPermissions(), errtypes.NotFound(n.ID) + } + // use the actual node for the permission assembly + n.ID = kp[0] + } + // check if the current user is the owner if utils.UserIDEqual(u.Id, n.Owner()) { return OwnerPermissions(), nil diff --git a/pkg/storage/utils/decomposedfs/node/xattrs.go b/pkg/storage/utils/decomposedfs/node/xattrs.go index be65e0d177..3945d1cf63 100644 --- a/pkg/storage/utils/decomposedfs/node/xattrs.go +++ b/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -20,6 +20,7 @@ package node import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/gofrs/flock" "github.com/pkg/xattr" ) @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) { return xattrs.SetMultiple(n.InternalPath(), attribs) } +// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock +func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) { + // TODO what if writing the lock fails? + if n.xattrsCache != nil { + for k, v := range attribs { + n.xattrsCache[k] = v + } + } + + return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock) +} + // SetXattr sets an extended attribute on the write-through cache/node func (n *Node) SetXattr(key, val string) (err error) { if n.xattrsCache != nil { @@ -43,6 +56,15 @@ func (n *Node) SetXattr(key, val string) (err error) { return xattrs.Set(n.InternalPath(), key, val) } +// SetXattrWithLock sets an extended attribute on the write-through cache/node with the given lock +func (n *Node) SetXattrWithLock(key, val string, fileLock *flock.Flock) (err error) { + if n.xattrsCache != nil { + n.xattrsCache[key] = val + } + + return xattrs.SetWithLock(n.InternalPath(), key, val, fileLock) +} + // RemoveXattr removes an extended attribute from the write-through cache/node func (n *Node) RemoveXattr(key string) error { if n.xattrsCache != nil { @@ -80,5 +102,6 @@ func (n *Node) Xattr(key string) (string, error) { if val, ok := n.xattrsCache[key]; ok { return val, nil } - return "", xattr.ENOATTR + // wrap the error as xattr does + return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR} } diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index a9a7381c21..dab017d137 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -21,7 +21,6 @@ package decomposedfs import ( "context" "io" - iofs "io/fs" "os" "path/filepath" "strings" @@ -31,6 +30,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/pkg/errors" ) @@ -83,7 +83,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen } blobSize, err := node.ReadBlobSizeAttr(items[i]) if err != nil { - return nil, errors.Wrapf(err, "error reading blobsize xattr") + appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0") } rev.Size = uint64(blobSize) etag, err := node.CalculateEtag(np, mtime) @@ -95,10 +95,18 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen } } } + // maybe we need to sort the list by key + /* + sort.Slice(revisions, func(i, j int) bool { + return revisions[i].Key > revisions[j].Key + }) + */ + return } // DownloadRevision returns a reader for the specified revision +// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813 func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) { log := appctx.GetLogger(ctx) @@ -125,7 +133,7 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe switch { case err != nil: return nil, errtypes.InternalError(err.Error()) - case !rp.ListFileVersions || !rp.RestoreFileVersion || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api? + case !rp.ListFileVersions || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api? f, _ := storagespace.FormatReference(ref) if rp.Stat { return nil, errtypes.PermissionDenied(f) @@ -135,18 +143,26 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe contentPath := fs.lu.InternalPath(spaceID, revisionKey) - r, err := os.Open(contentPath) + blobid, err := node.ReadBlobIDAttr(contentPath) if err != nil { - if errors.Is(err, iofs.ErrNotExist) { - return nil, errtypes.NotFound(contentPath) - } - return nil, errors.Wrap(err, "Decomposedfs: error opening revision "+revisionKey) + return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey) + } + blobsize, err := node.ReadBlobSizeAttr(contentPath) + if err != nil { + return nil, errors.Wrapf(err, "Decomposedfs: could not read blob size of revision '%s' for node '%s'", n.ID, revisionKey) + } + + revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore + + reader, err := fs.tp.ReadBlob(&revisionNode) + if err != nil { + return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) } - return r, nil + return reader, nil } // RestoreRevision restores the specified revision of the resource -func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (err error) { +func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (returnErr error) { log := appctx.GetLogger(ctx) // verify revision key format @@ -191,25 +207,74 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer nodePath := fs.lu.InternalPath(spaceID, kp[0]) var fi os.FileInfo if fi, err = os.Stat(nodePath); err == nil { - // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries - versionsPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) + // revisions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries + newRevisionPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) + + // touch new revision + if file, err := os.Create(newRevisionPath); err != nil { + return err + } else if err := file.Close(); err != nil { + return err + } + defer func() { + if returnErr != nil { + if err := os.Remove(newRevisionPath); err != nil { + log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node") + } + } + }() - err = os.Rename(nodePath, versionsPath) + // copy blob metadata from node to new revision node + err = xattrs.CopyMetadata(nodePath, newRevisionPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }) if err != nil { - return + return errtypes.InternalError("failed to copy blob xattrs to version node") } - // copy old revision to current location + // remember mtime from node as new revision mtime + if err = os.Chtimes(newRevisionPath, fi.ModTime(), fi.ModTime()); err != nil { + return errtypes.InternalError("failed to change mtime of version node") + } + + // update blob id in node - revisionPath := fs.lu.InternalPath(spaceID, revisionKey) + // copy blob metadata from restored revision to node + restoredRevisionPath := fs.lu.InternalPath(spaceID, revisionKey) + err = xattrs.CopyMetadata(restoredRevisionPath, nodePath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }) + if err != nil { + return errtypes.InternalError("failed to copy blob xattrs to old revision to node") + } - if err = os.Rename(revisionPath, nodePath); err != nil { - return + revisionSize, err := xattrs.GetInt64(restoredRevisionPath, xattrs.BlobsizeAttr) + if err != nil { + return errtypes.InternalError("failed to read blob size xattr from old revision") } - return fs.tp.Propagate(ctx, n) + // drop old revision + if err := os.Remove(restoredRevisionPath); err != nil { + log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision, continuing") + } + + // explicitly update mtime of node as writing xattrs does not change mtime + now := time.Now() + if err := os.Chtimes(nodePath, now, now); err != nil { + return errtypes.InternalError("failed to change mtime of version node") + } + + // revision 5, current 10 (restore a smaller blob) -> 5-10 = -5 + // revision 10, current 5 (restore a bigger blob) -> 10-5 = +5 + sizeDiff := revisionSize - n.Blobsize + + return fs.tp.Propagate(ctx, n, sizeDiff) } log.Error().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("original node does not exist") - return + return nil } diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index f8cd0d4254..c8812352f2 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storage/utils/templates" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" @@ -421,7 +422,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide for match := range matches { var err error // do not investigate flock files any further. They indicate file locks but are not relevant here. - if strings.HasSuffix(match, ".flock") { + if strings.HasSuffix(match, filelocks.LockFileSuffix) { continue } // always read link in case storage space id != node id @@ -443,8 +444,13 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide space, err := fs.storageSpaceFromNode(ctx, n, checkNodePermissions) if err != nil { - if _, ok := err.(errtypes.IsPermissionDenied); !ok { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not convert to storage space") + switch err.(type) { + case errtypes.IsPermissionDenied: + // ok + case errtypes.NotFound: + // ok + default: + appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not convert to storage space") } continue } diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 11b40eea15..37213b5bcd 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -228,8 +228,12 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz if err != nil { return nil, err } + if err := n.FindStorageSpaceRoot(); err != nil { + return nil, err + } + + return n, t.Tree.Propagate(context.Background(), n, blobSize) - return n, n.FindStorageSpaceRoot() } // CreateTestStorageSpace will create a storage space with some directories and files diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index e1e458832d..7aab6712ba 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" @@ -282,7 +283,7 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node) error { } } - return t.Propagate(ctx, n) + return t.Propagate(ctx, n, 0) } // CreateDir creates a new directory entry in the tree @@ -302,6 +303,10 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) { return } + if err := n.SetTreeSize(0); err != nil { + return err + } + // make child appear in listings relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) err = os.Symlink(relativeNodePath, filepath.Join(n.ParentInternalPath(), n.Name)) @@ -318,7 +323,7 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) { } return errtypes.AlreadyExists(err.Error()) } - return t.Propagate(ctx, n) + return t.Propagate(ctx, n, 0) } // Move replaces the target with the source @@ -363,7 +368,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) return errors.Wrap(err, "Decomposedfs: could not set name attribute") } - return t.Propagate(ctx, newNode) + return t.Propagate(ctx, newNode, 0) } // we are moving the node to a new parent, any target has been removed @@ -386,15 +391,27 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) return errors.Wrap(err, "Decomposedfs: could not set name attribute") } + // the size diff is the current treesize or blobsize of the old/source node + var sizeDiff int64 + if oldNode.IsDir() { + treeSize, err := oldNode.GetTreeSize() + if err != nil { + return err + } + sizeDiff = int64(treeSize) + } else { + sizeDiff = oldNode.Blobsize + } + // TODO inefficient because we might update several nodes twice, only propagate unchanged nodes? // collect in a list, then only stat each node once // also do this in a go routine ... webdav should check the etag async - err = t.Propagate(ctx, oldNode) + err = t.Propagate(ctx, oldNode, -sizeDiff) if err != nil { return errors.Wrap(err, "Decomposedfs: Move: could not propagate old node") } - err = t.Propagate(ctx, newNode) + err = t.Propagate(ctx, newNode, sizeDiff) if err != nil { return errors.Wrap(err, "Decomposedfs: Move: could not propagate new node") } @@ -472,6 +489,17 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { return err } + var sizeDiff int64 + if n.IsDir() { + treesize, err := n.GetTreeSize() + if err != nil { + return err // TODO calculate treesize if it is not set + } + sizeDiff = -int64(treesize) + } else { + sizeDiff = -n.Blobsize + } + deletionTime := time.Now().UTC().Format(time.RFC3339Nano) // Prepare the trash @@ -523,7 +551,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { return } - return t.Propagate(ctx, n) + return t.Propagate(ctx, n, sizeDiff) } // RestoreRecycleItemFunc returns a node and a function to restore it from the trash. @@ -593,7 +621,18 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa if err = os.Remove(trashItem); err != nil { log.Error().Err(err).Str("trashItem", trashItem).Msg("error deleting trash item") } - return t.Propagate(ctx, targetNode) + + var sizeDiff int64 + if recycleNode.IsDir() { + treeSize, err := recycleNode.GetTreeSize() + if err != nil { + return err + } + sizeDiff = int64(treeSize) + } else { + sizeDiff = recycleNode.Blobsize + } + return t.Propagate(ctx, targetNode, sizeDiff) } return recycleNode, parent, fn, nil } @@ -691,8 +730,8 @@ func (t *Tree) removeNode(path string, n *node.Node) error { } // Propagate propagates changes to the root of the tree -func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() +func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) { + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !t.treeTimeAccounting && !t.treeSizeAccounting { // no propagation enabled sublog.Debug().Msg("propagation disabled") @@ -713,7 +752,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { break } - sublog = sublog.With().Interface("node", n).Logger() + sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() // TODO none, sync and async? if !n.HasPropagation() { @@ -757,50 +796,62 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { } } - if err := n.UnsetTempEtag(); err != nil { + if err := n.UnsetTempEtag(); err != nil && !xattrs.IsAttrUnset(err) { sublog.Error().Err(err).Msg("could not remove temporary etag attribute") } } // size accounting - if t.treeSizeAccounting { - // update the treesize if it differs from the current size - updateTreeSize := false - - var treeSize, calculatedTreeSize uint64 - calculatedTreeSize, err = calculateTreeSize(ctx, n.InternalPath()) + if t.treeSizeAccounting && sizeDiff != 0 { + // lock node before reading treesize + nodeLock, err := filelocks.AcquireWriteLock(n.InternalPath()) if err != nil { - continue + return err } + // always unlock node + releaseLock := func() { + // ReleaseLock returns nil if already unlocked + if err := filelocks.ReleaseLock(nodeLock); err != nil { + sublog.Err(err).Msg("Decomposedfs: could not unlock parent node") + } + } + defer releaseLock() + + var newSize uint64 - treeSize, err = n.GetTreeSize() + // read treesize + treeSize, err := n.GetTreeSize() switch { + case xattrs.IsAttrUnset(err): + // fallback to calculating the treesize + newSize, err = calculateTreeSize(ctx, n.InternalPath()) + if err != nil { + return err + } case err != nil: - // missing attribute, or invalid format, overwrite - sublog.Debug().Err(err).Msg("could not read treesize attribute, overwriting") - updateTreeSize = true - case treeSize != calculatedTreeSize: - sublog.Debug(). - Uint64("treesize", treeSize). - Uint64("calculatedTreeSize", calculatedTreeSize). - Msg("parent treesize is different then calculated treesize, updating") - updateTreeSize = true + return err default: - sublog.Debug(). - Uint64("treesize", treeSize). - Uint64("calculatedTreeSize", calculatedTreeSize). - Msg("parent size matches calculated size, not updating") - } - - if updateTreeSize { - // update the tree time of the parent node - if err = n.SetTreeSize(calculatedTreeSize); err != nil { - sublog.Error().Err(err).Uint64("calculatedTreeSize", calculatedTreeSize).Msg("could not update treesize of parent node") + if sizeDiff > 0 { + newSize = treeSize + uint64(sizeDiff) } else { - sublog.Debug().Uint64("calculatedTreeSize", calculatedTreeSize).Msg("updated treesize of parent node") + newSize = treeSize - uint64(-sizeDiff) } } + + // update the tree time of the node + if err = n.SetXattrWithLock(xattrs.TreesizeAttr, strconv.FormatUint(newSize, 10), nodeLock); err != nil { + return err + } + + // Release node lock early, returns nil if already unlocked + err = filelocks.ReleaseLock(nodeLock) + if err != nil { + return errtypes.InternalError(err.Error()) + } + + sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node") } + } if err != nil { sublog.Error().Err(err).Msg("error propagating") @@ -855,7 +906,6 @@ func calculateTreeSize(ctx context.Context, nodePath string) (uint64, error) { } } return size, err - } // WriteBlob writes a blob to the blobstore @@ -963,6 +1013,11 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) ( return } + // lookup blobSize in extended attributes + if recycleNode.Blobsize, err = xattrs.GetInt64(deletedNodePath, xattrs.BlobsizeAttr); err != nil { + return + } + // lookup parent id in extended attributes if attrStr, err = xattrs.Get(deletedNodePath, xattrs.ParentidAttr); err == nil { recycleNode.ParentID = attrStr diff --git a/pkg/storage/utils/decomposedfs/tree/tree_test.go b/pkg/storage/utils/decomposedfs/tree/tree_test.go index b8a97eea09..69200e7e3a 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree_test.go +++ b/pkg/storage/utils/decomposedfs/tree/tree_test.go @@ -308,7 +308,7 @@ var _ = Describe("Tree", func() { riBefore, err := dir.AsResourceInfo(env.Ctx, &perms, []string{}, []string{}, false) Expect(err).ToNot(HaveOccurred()) - err = env.Tree.Propagate(env.Ctx, file) + err = env.Tree.Propagate(env.Ctx, file, 0) Expect(err).ToNot(HaveOccurred()) dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ @@ -325,10 +325,7 @@ var _ = Describe("Tree", func() { Describe("with TreeSizeAccounting enabled", func() { It("calculates the size", func() { - file, err := env.CreateTestFile("file1", "", dir.ID, dir.SpaceID, 1) - Expect(err).ToNot(HaveOccurred()) - - err = env.Tree.Propagate(env.Ctx, file) + _, err := env.CreateTestFile("file1", "", dir.ID, dir.SpaceID, 1) Expect(err).ToNot(HaveOccurred()) dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ @@ -345,10 +342,7 @@ var _ = Describe("Tree", func() { It("considers all files", func() { _, err := env.CreateTestFile("file1", "", dir.ID, dir.SpaceID, 1) Expect(err).ToNot(HaveOccurred()) - file2, err := env.CreateTestFile("file2", "", dir.ID, dir.SpaceID, 100) - Expect(err).ToNot(HaveOccurred()) - - err = env.Tree.Propagate(env.Ctx, file2) + _, err = env.CreateTestFile("file2", "", dir.ID, dir.SpaceID, 100) Expect(err).ToNot(HaveOccurred()) dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ @@ -367,11 +361,10 @@ var _ = Describe("Tree", func() { Expect(err).ToNot(HaveOccurred()) err = subdir.SetTreeSize(uint64(200)) Expect(err).ToNot(HaveOccurred()) - - file, err := env.CreateTestFile("file1", "", dir.ID, dir.SpaceID, 1) + err = env.Tree.Propagate(env.Ctx, subdir, 200) Expect(err).ToNot(HaveOccurred()) - err = env.Tree.Propagate(env.Ctx, file) + _, err = env.CreateTestFile("file1", "", dir.ID, dir.SpaceID, 1) Expect(err).ToNot(HaveOccurred()) dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ @@ -390,8 +383,7 @@ var _ = Describe("Tree", func() { Expect(err).ToNot(HaveOccurred()) err = subdir.SetTreeSize(uint64(200)) Expect(err).ToNot(HaveOccurred()) - - err = env.Tree.Propagate(env.Ctx, subdir) + err = env.Tree.Propagate(env.Ctx, subdir, 200) Expect(err).ToNot(HaveOccurred()) dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ @@ -412,7 +404,7 @@ var _ = Describe("Tree", func() { Expect(err).ToNot(HaveOccurred()) err = otherdir.SetTreeSize(uint64(100000)) Expect(err).ToNot(HaveOccurred()) - err = env.Tree.Propagate(env.Ctx, otherdir) + err = env.Tree.Propagate(env.Ctx, otherdir, 100000) Expect(err).ToNot(HaveOccurred()) dir, err = env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index d45c573586..66c62acc6b 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -48,11 +48,11 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rs/zerolog" ) var defaultFilePerm = os.FileMode(0664) @@ -566,6 +566,20 @@ func (upload *fileUpload) writeInfo() error { } // FinishUpload finishes an upload and moves the file to the internal destination +// +// # upload steps +// check if match header to fail early +// copy blob +// lock metadata node +// check if match header again as safeguard +// read metadata +// create version node with current metadata +// update node metadata with new blobid etc +// remember size diff +// unlock metadata +// propagate size diff and new etag +// - propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant +// - propagation needs to propagate the diff func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // ensure cleanup @@ -578,7 +592,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } spaceID := upload.info.Storage["SpaceRoot"] - n := node.New( + newNode := node.New( spaceID, upload.info.Storage["NodeId"], upload.info.Storage["NodeParentId"], @@ -588,36 +602,38 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { nil, upload.fs.lu, ) - n.SpaceRoot = node.New(spaceID, spaceID, "", "", 0, "", nil, upload.fs.lu) + newNode.SpaceRoot = node.New(spaceID, spaceID, "", "", 0, "", nil, upload.fs.lu) // check lock if upload.info.MetaData["lockid"] != "" { ctx = ctxpkg.ContextSetLockID(ctx, upload.info.MetaData["lockid"]) } - if err := n.CheckLock(ctx); err != nil { + if err := newNode.CheckLock(ctx); err != nil { return err } - overwrite := n.ID != "" - var oldSize uint64 + overwrite := newNode.ID != "" + var oldSize int64 if overwrite { // read size from existing node - old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, n.ID, false) - oldSize = uint64(old.Blobsize) + old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, newNode.ID, false) + oldSize = old.Blobsize } else { // create new fileid - n.ID = uuid.New().String() - upload.info.Storage["NodeId"] = n.ID + newNode.ID = uuid.New().String() + upload.info.Storage["NodeId"] = newNode.ID } - if _, err = node.CheckQuota(n.SpaceRoot, overwrite, oldSize, uint64(fi.Size())); err != nil { + if _, err = node.CheckQuota(newNode.SpaceRoot, overwrite, uint64(oldSize), uint64(fi.Size())); err != nil { return err } - targetPath := n.InternalPath() + targetPath := newNode.InternalPath() sublog := appctx.GetLogger(upload.ctx). With(). Interface("info", upload.info). + Str("spaceid", spaceID). + Str("nodeid", newNode.ID). Str("binPath", upload.binPath). Str("targetPath", targetPath). Logger() @@ -665,18 +681,19 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return err } } - n.BlobID = upload.info.ID // This can be changed to a content hash in the future when reference counting for the blobs was added + newNode.BlobID = upload.info.ID // This can be changed to a content hash in the future when reference counting for the blobs was added // defer writing the checksums until the node is in place - // if target exists create new version - versionsPath := "" + // upload steps + // check if match header to fail early + if fi, err = os.Stat(targetPath); err == nil { // When the if-match header was set we need to check if the // etag still matches before finishing the upload. if ifMatch, ok := upload.info.MetaData["if-match"]; ok { var targetEtag string - targetEtag, err = node.CalculateEtag(n.ID, fi.ModTime()) + targetEtag, err = node.CalculateEtag(newNode.ID, fi.ModTime()) if err != nil { return errtypes.InternalError(err.Error()) } @@ -684,85 +701,191 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errtypes.Aborted("etag mismatch") } } - - // FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata... - // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries - versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - - // This move drops all metadata!!! We copy it below with CopyMetadata - // FIXME the node must remain the same. otherwise we might restore share metadata - if err = os.Rename(targetPath, versionsPath); err != nil { - sublog.Err(err). - Str("binPath", upload.binPath). - Str("versionsPath", versionsPath). - Msg("Decomposedfs: could not create version") - return + } else { + // create dir to node + if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { + sublog.Err(err).Msg("could not create node dir") + return errtypes.InternalError("could not create node dir") } - } - // upload the data to the blobstore + // copy blob + file, err := os.Open(upload.binPath) if err != nil { return err } defer file.Close() - err = upload.fs.tp.WriteBlob(n, file) + err = upload.fs.tp.WriteBlob(newNode, file) if err != nil { - return errors.Wrap(err, "failed to upload file to blostore") + return errors.Wrap(err, "failed to upload file to blobstore") } - // now truncate the upload (the payload stays in the blobstore) and move it to the target path - // TODO put uploads on the same underlying storage as the destination dir? - // TODO trigger a workflow as the final rename might eg involve antivirus scanning - if err = os.Truncate(upload.binPath, 0); err != nil { - sublog.Err(err). - Msg("Decomposedfs: could not truncate") - return + // prepare discarding the blob if something changed while writing it + discardBlob := func() { + if err := upload.fs.tp.DeleteBlob(newNode); err != nil { + sublog.Err(err).Str("blobid", newNode.BlobID).Msg("Decomposedfs: failed to discard blob in blobstore") + } + } + + // lock metadata node + lock, err := filelocks.AcquireWriteLock(targetPath) + if err != nil { + discardBlob() + return errtypes.InternalError(err.Error()) } - if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { - sublog.Warn().Err(err).Msg("Decomposedfs: could not create node dir, trying to write file anyway") + releaseLock := func() { + // ReleaseLock returns nil if already unlocked + if err := filelocks.ReleaseLock(lock); err != nil { + sublog.Err(err).Msg("Decomposedfs:could not unlock node") + } } - if err = os.Rename(upload.binPath, targetPath); err != nil { - sublog.Error().Err(err).Msg("Decomposedfs: could not rename") - return + defer releaseLock() + + // check if match header again as safeguard + var oldMtime time.Time + versionsPath := "" + if fi, err = os.Stat(targetPath); err == nil { + // When the if-match header was set we need to check if the + // etag still matches before finishing the upload. + if ifMatch, ok := upload.info.MetaData["if-match"]; ok { + var targetEtag string + targetEtag, err = node.CalculateEtag(newNode.ID, fi.ModTime()) + if err != nil { + discardBlob() + return errtypes.InternalError(err.Error()) + } + if ifMatch != targetEtag { + discardBlob() + return errtypes.Aborted("etag mismatch") + } + } + + // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries + versionsPath = upload.fs.lu.InternalPath(spaceID, newNode.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) + + // remember mtime of existing file so we can apply it to the version + oldMtime = fi.ModTime() + } + + // read metadata + + // attributes that will change + attrs := map[string]string{ + xattrs.BlobIDAttr: newNode.BlobID, + xattrs.BlobsizeAttr: strconv.FormatInt(newNode.Blobsize, 10), + + // update checksums + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), } + + // create version node with current metadata + + var newMtime time.Time + // if file already exists if versionsPath != "" { - // copy grant and arbitrary metadata - // FIXME ... now restoring an older revision might bring back a grant that was removed! - err = xattrs.CopyMetadata(versionsPath, targetPath, func(attributeName string) bool { - return true - // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties - /* - return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants - strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata - strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites - strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file - */ - }) + // touch version node + file, err := os.Create(versionsPath) if err != nil { - sublog.Info().Err(err).Msg("Decomposedfs: failed to copy xattrs") + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("could not create version node") + return errtypes.InternalError("could not create version node") } - } + defer file.Close() + + fi, err := file.Stat() + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("could not stat version node") + return errtypes.InternalError("could not stat version node") + } + newMtime = fi.ModTime() + + // copy blob metadata to version node + err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }, lock) + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") + return errtypes.InternalError("failed to copy blob xattrs to version node") + } + + // keep mtime from previous version + if err := os.Chtimes(versionsPath, oldMtime, oldMtime); err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to change mtime of version node") + return errtypes.InternalError("failed to change mtime of version node") + } + + // we MUST bypass any cache here as we have to calculate the size diff atomically + oldSize, err = node.ReadBlobSizeAttr(targetPath) + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to read old blobsize") + return errtypes.InternalError("failed to read old blobsize") + } + } else { + // touch metadata node + file, err := os.Create(targetPath) + if err != nil { + discardBlob() + sublog.Err(err).Msg("could not create node") + return errtypes.InternalError("could not create node") + } + file.Close() - // now try write all checksums - tryWritingChecksum(&sublog, n, "sha1", sha1h) - tryWritingChecksum(&sublog, n, "md5", md5h) - tryWritingChecksum(&sublog, n, "adler32", adler32h) + // basic node metadata + attrs[xattrs.ParentidAttr] = newNode.ParentID + attrs[xattrs.NameAttr] = newNode.Name + oldSize = 0 + } - // who will become the owner? the owner of the parent actually ... not the currently logged in user - err = n.WriteAllNodeMetadata() + // update node metadata with new blobid etc + err = newNode.SetXattrsWithLock(attrs, lock) if err != nil { + discardBlob() return errors.Wrap(err, "Decomposedfs: could not write metadata") } + // update mtime + switch { + case upload.info.MetaData["mtime"] != "": + if err := newNode.SetMtimeString(upload.info.MetaData["mtime"]); err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not apply mtime from metadata") + return err + } + case newMtime != time.Time{}: + // we are creating a version + if err := newNode.SetMtime(newMtime); err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not change mtime of node") + return err + } + } + + // remember size diff + // old 10, new 5 (upload a smaller file) -> 5-10 = -5 + // old 5, new 10 (upload a bigger file) -> 10-5 = +5 + sizeDiff := newNode.Blobsize - oldSize + + // unlock metadata + err = filelocks.ReleaseLock(lock) + if err != nil { + return errtypes.InternalError(err.Error()) + } + // link child name to parent if it is new - childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) + childNameLink := filepath.Join(newNode.ParentInternalPath(), newNode.Name) + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(newNode.ID, 4, 2)) var link string link, err = os.Readlink(childNameLink) - if err == nil && link != "../"+n.ID { + if err == nil && link != relativeNodePath { sublog.Err(err). - Interface("node", n). + Interface("node", newNode). Str("childNameLink", childNameLink). Str("link", link). Msg("Decomposedfs: child name link has wrong target id, repairing") @@ -771,39 +894,24 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } - if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + if errors.Is(err, iofs.ErrNotExist) || link != relativeNodePath { if err = os.Symlink(relativeNodePath, childNameLink); err != nil { return errors.Wrap(err, "Decomposedfs: could not symlink child entry") } } - // only delete the upload if it was successfully written to the storage - if err = os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - sublog.Err(err).Msg("Decomposedfs: could not delete upload info") - return - } - } - // use set arbitrary metadata? - if upload.info.MetaData["mtime"] != "" { - err := n.SetMtime(ctx, upload.info.MetaData["mtime"]) - if err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata") - return err - } - - } - // fill metadata with current mtime if fi, err = os.Stat(targetPath); err == nil { upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond()) - upload.info.MetaData["etag"], _ = node.CalculateEtag(n.ID, fi.ModTime()) + upload.info.MetaData["etag"], _ = node.CalculateEtag(newNode.ID, fi.ModTime()) } - n.Exists = true + newNode.Exists = true - return upload.fs.tp.Propagate(upload.ctx, n) + // propagate size diff and new etag + // propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant + sublog.Debug().Int64("sizediff", sizeDiff).Msg("Decomposedfs: propagating size diff") + return upload.fs.tp.Propagate(upload.ctx, newNode, sizeDiff) } func (upload *fileUpload) checkHash(expected string, h hash.Hash) error { @@ -813,15 +921,6 @@ func (upload *fileUpload) checkHash(expected string, h hash.Hash) error { } return nil } -func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) { - if err := n.SetChecksum(algo, h); err != nil { - log.Err(err). - Str("csType", algo). - Bytes("hash", h.Sum(nil)). - Msg("Decomposedfs: could not write checksum") - // this is not critical, the bytes are there so we will continue - } -} func (upload *fileUpload) discardChunk() { if err := os.Remove(upload.binPath); err != nil { diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 732e8e880c..8bd24bbb98 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -125,16 +125,16 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // For the source file, a shared lock is acquired. For the target, an exclusive // write lock is acquired. func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { - var writeLock, readLock *flock.Flock + var readLock *flock.Flock - // Acquire the write log on the target node first. - writeLock, err = filelocks.AcquireWriteLock(target) + // Acquire a read log on the source node + readLock, err = filelocks.AcquireReadLock(src) if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock target to write") + return errors.Wrap(err, "xattrs: Unable to lock source to read") } defer func() { - rerr := filelocks.ReleaseLock(writeLock) + rerr := filelocks.ReleaseLock(readLock) // if err is non nil we do not overwrite that if err == nil { @@ -142,14 +142,33 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e } }() - // now try to get a shared lock on the source - readLock, err = filelocks.AcquireReadLock(src) + return CopyMetadataWithSourceLock(src, target, filter, readLock) +} + +// CopyMetadataWithSourceLock copies all extended attributes from source to target. +// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix +// For the source file, a shared lock is acquired. For the target, an exclusive +// write lock is acquired. +func CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { + switch { + case readLock == nil: + return errors.New("no lock provided") + case readLock.Path() != filelocks.FlockFile(src): + return errors.New("lockpath does not match filepath") + case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock + return errors.New("not locked") + } + + var writeLock *flock.Flock + + // Acquire the write log on the target node + writeLock, err = filelocks.AcquireWriteLock(target) if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock file for read") + return errors.Wrap(err, "xattrs: Unable to lock target to write") } defer func() { - rerr := filelocks.ReleaseLock(readLock) + rerr := filelocks.ReleaseLock(writeLock) // if err is non nil we do not overwrite that if err == nil { @@ -191,7 +210,6 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e // Set an extended attribute key to the given value func Set(filePath string, key string, val string) (err error) { fileLock, err := filelocks.AcquireWriteLock(filePath) - if err != nil { return errors.Wrap(err, "xattrs: Can not acquire write log") } @@ -207,6 +225,21 @@ func Set(filePath string, key string, val string) (err error) { return xattr.Set(filePath, key, []byte(val)) } +// SetWithLock an extended attribute key to the given value with an existing lock +func SetWithLock(filePath string, key string, val string, fileLock *flock.Flock) (err error) { + // check the file is write locked + switch { + case fileLock == nil: + return errors.New("no lock provided") + case fileLock.Path() != filelocks.FlockFile(filePath): + return errors.New("lockpath does not match filepath") + case !fileLock.Locked(): + return errors.New("not write locked") + } + + return xattr.Set(filePath, key, []byte(val)) +} + // Remove an extended attribute key func Remove(filePath string, key string) (err error) { fileLock, err := filelocks.AcquireWriteLock(filePath) @@ -246,6 +279,20 @@ func SetMultiple(filePath string, attribs map[string]string) (err error) { } }() + return SetMultipleWithLock(filePath, attribs, fileLock) +} + +// SetMultipleWithLock allows setting multiple key value pairs at once with an existing lock +func SetMultipleWithLock(filePath string, attribs map[string]string, fileLock *flock.Flock) (err error) { + switch { + case fileLock == nil: + return errors.New("no lock provided") + case fileLock.Path() != filelocks.FlockFile(filePath): + return errors.New("lockpath does not match filepath") + case !fileLock.Locked(): + return errors.New("not locked") + } + // error handling: Count if there are errors while setting the attribs. // if there were any, return an error. var ( diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index f338f881f5..8816cbf1b6 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -28,6 +28,9 @@ import ( "github.com/gofrs/flock" ) +// LockFileSuffix to use for lock files +const LockFileSuffix = ".flock" + var ( _localLocks sync.Map _lockCycles sync.Once @@ -80,15 +83,11 @@ func acquireLock(file string, write bool) (*flock.Flock, error) { var err error // Create a file to carry the log - n := flockFile(file) + n := FlockFile(file) if len(n) == 0 { return nil, ErrPathEmpty } - if _, err = os.Stat(file); err != nil { - return nil, err - } - var flock *flock.Flock for i := 1; i <= _lockCyclesValue; i++ { if flock = getMutexedFlock(n); flock != nil { @@ -127,14 +126,13 @@ func acquireLock(file string, write bool) (*flock.Flock, error) { return flock, nil } -// flockFile returns the flock filename for a given file name +// FlockFile returns the flock filename for a given file name // it returns an empty string if the input is empty -func flockFile(file string) string { - var n string - if len(file) > 0 { - n = file + ".flock" +func FlockFile(file string) string { + if file == "" { + return "" } - return n + return file + LockFileSuffix } // AcquireReadLock tries to acquire a shared lock to read from the diff --git a/tests/acceptance/expected-failures-on-OCIS-storage.md b/tests/acceptance/expected-failures-on-OCIS-storage.md index cccfb6a054..df199c77e7 100644 --- a/tests/acceptance/expected-failures-on-OCIS-storage.md +++ b/tests/acceptance/expected-failures-on-OCIS-storage.md @@ -284,10 +284,6 @@ _requires a [CS3 user provisioning api that can update the quota for a user](htt - [apiWebdavMove2/moveShareOnOcis.feature:169](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L169) - [apiWebdavMove2/moveShareOnOcis.feature:170](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L170) - -#### [restoring an older version of a shared file deletes the share](https://github.com/owncloud/ocis/issues/765) -- [apiShareManagementToShares/acceptShares.feature:579](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareManagementToShares/acceptShares.feature#L579) - #### [Expiration date for shares is not implemented](https://github.com/owncloud/ocis/issues/1250) #### Expiration date of user shares - [apiShareCreateSpecialToShares1/createShareExpirationDate.feature:52](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareCreateSpecialToShares1/createShareExpirationDate.feature#L52) @@ -788,9 +784,6 @@ moving outside of the Shares folder gives 501 Not Implemented. - [apiWebdavProperties1/copyFile.feature:437](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavProperties1/copyFile.feature#L437) - [apiWebdavProperties1/copyFile.feature:442](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavProperties1/copyFile.feature#L442) -#### [Downloading the older version of shared file gives 404](https://github.com/owncloud/ocis/issues/3868) -- [apiVersions/fileVersionsSharingToShares.feature:306](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionsSharingToShares.feature#L306) - #### [file versions do not report the version author](https://github.com/owncloud/ocis/issues/2914) - [apiVersions/fileVersionAuthor.feature:14](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L14) - [apiVersions/fileVersionAuthor.feature:37](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L37) diff --git a/tests/acceptance/expected-failures-on-S3NG-storage.md b/tests/acceptance/expected-failures-on-S3NG-storage.md index a9efc55de4..0cf8dde14c 100644 --- a/tests/acceptance/expected-failures-on-S3NG-storage.md +++ b/tests/acceptance/expected-failures-on-S3NG-storage.md @@ -9,9 +9,6 @@ Basic file management like up and download, move, copy, properties, quota, trash - [apiTrashbin/trashbinFilesFolders.feature:318](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiTrashbin/trashbinFilesFolders.feature#L318) - [apiTrashbin/trashbinFilesFolders.feature:323](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiTrashbin/trashbinFilesFolders.feature#L323) -#### [Downloading the older version of shared file gives 404](https://github.com/owncloud/ocis/issues/3868) -- [apiVersions/fileVersionsSharingToShares.feature:306](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionsSharingToShares.feature#L306) - #### [file versions do not report the version author](https://github.com/owncloud/ocis/issues/2914) - [apiVersions/fileVersionAuthor.feature:14](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L14) - [apiVersions/fileVersionAuthor.feature:37](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L37) @@ -293,9 +290,6 @@ _requires a [CS3 user provisioning api that can update the quota for a user](htt - [apiWebdavMove2/moveShareOnOcis.feature:169](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L169) - [apiWebdavMove2/moveShareOnOcis.feature:170](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L170) -#### [restoring an older version of a shared file deletes the share](https://github.com/owncloud/ocis/issues/765) -- [apiShareManagementToShares/acceptShares.feature:579](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareManagementToShares/acceptShares.feature#L579) - #### [Expiration date for shares is not implemented](https://github.com/owncloud/ocis/issues/1250) #### Expiration date of user shares - [apiShareCreateSpecialToShares1/createShareExpirationDate.feature:52](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareCreateSpecialToShares1/createShareExpirationDate.feature#L52)