diff --git a/changelog/unreleased/reduce-lock-contention.md b/changelog/unreleased/reduce-lock-contention.md new file mode 100644 index 0000000000..88dddb4be8 --- /dev/null +++ b/changelog/unreleased/reduce-lock-contention.md @@ -0,0 +1,5 @@ +Enhancement: Reduce lock contention issues + +We reduced lock contention during high load by caching the extended attributes of a file for the duration of a request. + +https://github.com/cs3org/reva/pull/3397 diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index d964837c8c..776ceb5d49 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -341,7 +341,7 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference) if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting { // mark the home node as the end of propagation - if err = n.SetMetadata(xattrs.PropagationAttr, "1"); err != nil { + if err = n.SetXattr(xattrs.PropagationAttr, "1"); err != nil { appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate") // FIXME: This does not return an error at all, but results in a severe situation that the diff --git a/pkg/storage/utils/decomposedfs/grants.go b/pkg/storage/utils/decomposedfs/grants.go index 1efd713f5a..e3edb30a9e 100644 --- a/pkg/storage/utils/decomposedfs/grants.go +++ b/pkg/storage/utils/decomposedfs/grants.go @@ -135,15 +135,21 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) } log := appctx.GetLogger(ctx) np := node.InternalPath() - var attrs []string - if attrs, err = xattrs.List(np); err != nil { + var attrs map[string]string + if attrs, err = node.Xattrs(); err != nil { log.Error().Err(err).Msg("error listing attributes") return nil, err } + attrNames := make([]string, len(attrs)) + i := 0 + for attr := range attrs { + attrNames[i] = attr + i++ + } - log.Debug().Interface("attrs", attrs).Msg("read attributes") + log.Debug().Interface("attrs", attrNames).Msg("read attributes") - aces := extractACEsFromAttrs(ctx, np, attrs) + aces := extractACEsFromAttrs(ctx, np, attrNames) uid := ctxpkg.ContextMustGetUser(ctx).GetId() grants = make([]*provider.Grant, 0, len(aces)) @@ -303,7 +309,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide // set the grant e := ace.FromGrant(g) principal, value := e.Marshal() - if err := n.SetMetadata(xattrs.GrantPrefix+principal, string(value)); err != nil { + if err := n.SetXattr(xattrs.GrantPrefix+principal, string(value)); err != nil { appctx.GetLogger(ctx).Error().Err(err). Str("principal", principal).Msg("Could not set grant for principal") return err diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index c4beaa1791..0bd6ede652 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -134,7 +134,7 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe } if followReferences { - if attrBytes, err := r.GetMetadata(xattrs.ReferenceAttr); err == nil { + if attrBytes, err := r.Xattr(xattrs.ReferenceAttr); err == nil { realNodeID := attrBytes ref, err := xattrs.ReferenceFromAttr([]byte(realNodeID)) if err != nil { @@ -147,7 +147,7 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe } } } - if node.IsSpaceRoot(r) { + if r.IsSpaceRoot() { r.SpaceRoot = r } diff --git a/pkg/storage/utils/decomposedfs/metadata.go b/pkg/storage/utils/decomposedfs/metadata.go index 306b198a27..ed9122796c 100644 --- a/pkg/storage/utils/decomposedfs/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata.go @@ -33,7 +33,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" - "github.com/pkg/xattr" ) // SetArbitraryMetadata sets the metadata on a resource @@ -69,8 +68,6 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. return err } - nodePath := n.InternalPath() - errs := []error{} // TODO should we really continue updating when an error occurs? if md.Metadata != nil { @@ -115,7 +112,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. } for k, v := range md.Metadata { attrName := xattrs.MetadataPrefix + k - if err = xattr.Set(nodePath, attrName, []byte(v)); err != nil { + if err = n.SetXattr(attrName, v); err != nil { errs = append(errs, errors.Wrap(err, "Decomposedfs: could not set metadata attribute "+attrName+" to "+k)) } } @@ -166,7 +163,6 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide return err } - nodePath := n.InternalPath() errs := []error{} for _, k := range keys { switch k { @@ -189,7 +185,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide continue } fa := fmt.Sprintf("%s:%s:%s@%s", xattrs.FavPrefix, utils.UserTypeToString(uid.GetType()), uid.GetOpaqueId(), uid.GetIdp()) - if err := xattrs.Remove(nodePath, fa); err != nil { + if err := n.RemoveXattr(fa); err != nil { if xattrs.IsAttrUnset(err) { continue // already gone, ignore } @@ -200,7 +196,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide errs = append(errs, errors.Wrap(err, "could not unset favorite flag")) } default: - if err = xattrs.Remove(nodePath, xattrs.MetadataPrefix+k); err != nil { + if err = n.RemoveXattr(xattrs.MetadataPrefix + k); err != nil { if xattrs.IsAttrUnset(err) { continue // already gone, ignore } diff --git a/pkg/storage/utils/decomposedfs/node/locks_test.go b/pkg/storage/utils/decomposedfs/node/locks_test.go index 9f359e84cb..b03ee49271 100644 --- a/pkg/storage/utils/decomposedfs/node/locks_test.go +++ b/pkg/storage/utils/decomposedfs/node/locks_test.go @@ -81,8 +81,12 @@ var _ = Describe("Node locks", func() { AppName: "app2", LockId: uuid.New().String(), } - n = node.New("u-s-e-r-id", "tobelockedid", "", "tobelocked", 10, "", env.Owner.Id, env.Lookup) - n2 = node.New("u-s-e-r-id", "neverlockedid", "", "neverlocked", 10, "", env.Owner.Id, env.Lookup) + spaceResID, err := env.CreateTestStorageSpace("project", &provider.Quota{QuotaMaxBytes: 2000}) + Expect(err).ToNot(HaveOccurred()) + n, err = env.CreateTestFile("tobelockedid", "blob", spaceResID.OpaqueId, spaceResID.OpaqueId, 10) + Expect(err).ToNot(HaveOccurred()) + n2, err = env.CreateTestFile("neverlockedlockedid", "blob", spaceResID.OpaqueId, spaceResID.OpaqueId, 10) + Expect(err).ToNot(HaveOccurred()) }) AfterEach(func() { diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 714c54a07e..fa05e41da2 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -82,7 +82,8 @@ type Node struct { Exists bool SpaceRoot *Node - lu PathLookup + lu PathLookup + xattrsCache map[string]string } // PathLookup defines the interface for the lookup component @@ -111,47 +112,19 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, owne // ChangeOwner sets the owner of n to newOwner func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { - rootNodePath := n.SpaceRoot.InternalPath() n.SpaceRoot.owner = new var attribs = map[string]string{xattrs.OwnerIDAttr: new.OpaqueId, xattrs.OwnerIDPAttr: new.Idp, xattrs.OwnerTypeAttr: utils.UserTypeToString(new.Type)} - if err := xattrs.SetMultiple(rootNodePath, attribs); err != nil { + if err := n.SpaceRoot.SetXattrs(attribs); err != nil { return err } return } -// SetMetadata populates a given key with its value. -// Note that consumers should be aware of the metadata options on xattrs.go. -func (n *Node) SetMetadata(key string, val string) (err error) { - nodePath := n.InternalPath() - if err := xattrs.Set(nodePath, key, val); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set extended attribute") - } - return nil -} - -// RemoveMetadata removes a given key -func (n *Node) RemoveMetadata(key string) (err error) { - if err = xattrs.Remove(n.InternalPath(), key); err == nil || xattrs.IsAttrUnset(err) { - return nil - } - return err -} - -// GetMetadata reads the metadata for the given key -func (n *Node) GetMetadata(key string) (val string, err error) { - nodePath := n.InternalPath() - if val, err = xattrs.Get(nodePath, key); err != nil { - return "", errors.Wrap(err, "Decomposedfs: could not get extended attribute") - } - return val, nil -} - // WriteAllNodeMetadata writes the Node metadata to disk func (n *Node) WriteAllNodeMetadata() (err error) { attribs := make(map[string]string) @@ -161,8 +134,7 @@ func (n *Node) WriteAllNodeMetadata() (err error) { attribs[xattrs.BlobIDAttr] = n.BlobID attribs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10) - nodePath := n.InternalPath() - return xattrs.SetMultiple(nodePath, attribs) + return n.SetXattrs(attribs) } // WriteOwner writes the space owner @@ -173,8 +145,7 @@ func (n *Node) WriteOwner(owner *userpb.UserId) error { xattrs.OwnerIDPAttr: owner.Idp, xattrs.OwnerTypeAttr: utils.UserTypeToString(owner.Type), } - nodeRootPath := n.SpaceRoot.InternalPath() - if err := xattrs.SetMultiple(nodeRootPath, attribs); err != nil { + if err := n.SpaceRoot.SetXattrs(attribs); err != nil { return err } n.SpaceRoot.owner = owner @@ -247,7 +218,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis nodePath := n.InternalPath() // lookup name in extended attributes - n.Name, err = xattrs.Get(nodePath, xattrs.NameAttr) + n.Name, err = n.Xattr(xattrs.NameAttr) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -258,7 +229,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis n.Exists = true // lookup blobID in extended attributes - n.BlobID, err = xattrs.Get(nodePath, xattrs.BlobIDAttr) + n.BlobID, err = n.Xattr(xattrs.BlobIDAttr) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -272,11 +243,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false case err != nil: - return nil, err + return nil, errtypes.InternalError(err.Error()) } // lookup parent id in extended attributes - n.ParentID, err = xattrs.Get(nodePath, xattrs.ParentidAttr) + n.ParentID, err = n.Xattr(xattrs.ParentidAttr) switch { case xattrs.IsAttrUnset(err): return nil, errtypes.InternalError(err.Error()) @@ -386,23 +357,20 @@ func (n *Node) Parent() (p *Node, err error) { SpaceRoot: n.SpaceRoot, } - // parentPath := n.lu.InternalPath(spaceID, n.ParentID) - parentPath := p.InternalPath() - // lookup parent id in extended attributes - if p.ParentID, err = xattrs.Get(parentPath, xattrs.ParentidAttr); err != nil { + if p.ParentID, err = p.Xattr(xattrs.ParentidAttr); err != nil { p.ParentID = "" return } // lookup name in extended attributes - if p.Name, err = xattrs.Get(parentPath, xattrs.NameAttr); err != nil { + if p.Name, err = p.Xattr(xattrs.NameAttr); err != nil { p.Name = "" p.ParentID = "" return } // check node exists - if _, err := os.Stat(parentPath); err == nil { + if _, err := os.Stat(p.InternalPath()); err == nil { p.Exists = true } return @@ -416,15 +384,13 @@ func (n *Node) Owner() *userpb.UserId { // readOwner reads the owner from the extended attributes of the space root // in case either owner id or owner idp are unset we return an error and an empty owner object func (n *Node) readOwner() (*userpb.UserId, error) { - owner := &userpb.UserId{} - rootNodePath := n.SpaceRoot.InternalPath() // lookup parent id in extended attributes var attr string var err error // lookup ID in extended attributes - attr, err = xattrs.Get(rootNodePath, xattrs.OwnerIDAttr) + attr, err = n.SpaceRoot.Xattr(xattrs.OwnerIDAttr) switch { case err == nil: owner.OpaqueId = attr @@ -435,7 +401,7 @@ func (n *Node) readOwner() (*userpb.UserId, error) { } // lookup IDP in extended attributes - attr, err = xattrs.Get(rootNodePath, xattrs.OwnerIDPAttr) + attr, err = n.SpaceRoot.Xattr(xattrs.OwnerIDPAttr) switch { case err == nil: owner.Idp = attr @@ -446,7 +412,7 @@ func (n *Node) readOwner() (*userpb.UserId, error) { } // lookup type in extended attributes - attr, err = xattrs.Get(rootNodePath, xattrs.OwnerTypeAttr) + attr, err = n.SpaceRoot.Xattr(xattrs.OwnerTypeAttr) switch { case err == nil: owner.Type = utils.UserTypeMap(attr) @@ -545,12 +511,11 @@ func (n *Node) SetMtime(ctx context.Context, mtime string) error { // SetEtag sets the temporary etag of a node if it differs from the current etag func (n *Node) SetEtag(ctx context.Context, val string) (err error) { sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() - nodePath := n.InternalPath() var tmTime time.Time if tmTime, err = n.GetTMTime(); err != nil { // no tmtime, use mtime var fi os.FileInfo - if fi, err = os.Lstat(nodePath); err != nil { + if fi, err = os.Lstat(n.InternalPath()); err != nil { return } tmTime = fi.ModTime() @@ -569,7 +534,7 @@ func (n *Node) SetEtag(ctx context.Context, val string) (err error) { return nil } // etag is only valid until the calculated etag changes, is part of propagation - return xattrs.Set(nodePath, xattrs.TmpEtagAttr, val) + return n.SetXattr(xattrs.TmpEtagAttr, val) } // SetFavorite sets the favorite for the current user @@ -590,18 +555,17 @@ func (n *Node) SetEtag(ctx context.Context, val string) (err error) { // obviously this only is secure when the u/s/g/a namespaces are not accessible by users in the filesystem // public tags can be mapped to extended attributes func (n *Node) SetFavorite(uid *userpb.UserId, val string) error { - nodePath := n.InternalPath() // the favorite flag is specific to the user, so we need to incorporate the userid fa := fmt.Sprintf("%s:%s:%s@%s", xattrs.FavPrefix, utils.UserTypeToString(uid.GetType()), uid.GetOpaqueId(), uid.GetIdp()) - return xattrs.Set(nodePath, fa, val) + return n.SetXattr(fa, val) } -// IsDir returns true if the note is a directory +// IsDir returns true if the node is a directory func (n *Node) IsDir() bool { nodePath := n.InternalPath() if fi, err := os.Lstat(nodePath); err == nil { if fi.IsDir() { - if _, err = xattrs.Get(nodePath, xattrs.ReferenceAttr); err != nil { + if _, err := n.Xattr(xattrs.ReferenceAttr); err != nil { return true } } @@ -626,7 +590,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi var target string switch { case fi.IsDir(): - if target, err = xattrs.Get(nodePath, xattrs.ReferenceAttr); err == nil { + if target, err = n.Xattr(xattrs.ReferenceAttr); err == nil { nodeType = provider.ResourceType_RESOURCE_TYPE_REFERENCE } else { nodeType = provider.ResourceType_RESOURCE_TYPE_CONTAINER @@ -690,7 +654,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi } // use temporary etag if it is set - if b, err := xattrs.Get(nodePath, xattrs.TmpEtagAttr); err == nil { + if b, err := n.Xattr(xattrs.TmpEtagAttr); err == nil { ri.Etag = fmt.Sprintf(`"%x"`, b) // TODO why do we convert string(b)? is the temporary etag stored as string? -> should we use bytes? use hex.EncodeToString? } else if ri.Etag, err = calculateEtag(n.ID, tmTime); err != nil { sublog.Debug().Err(err).Msg("could not calculate etag") @@ -733,7 +697,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi // the favorite flag is specific to the user, so we need to incorporate the userid if uid := u.GetId(); uid != nil { fa := fmt.Sprintf("%s:%s:%s@%s", xattrs.FavPrefix, utils.UserTypeToString(uid.GetType()), uid.GetOpaqueId(), uid.GetIdp()) - if val, err := xattrs.Get(nodePath, fa); err == nil { + if val, err := n.Xattr(fa); err == nil { sublog.Debug(). Str("favorite", fa). Msg("found favorite flag") @@ -782,38 +746,32 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi if _, ok := mdKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) { // TODO which checksum was requested? sha1 adler32 or md5? for now hardcode sha1? // TODO make ResourceInfo carry multiple checksums - readChecksumIntoResourceChecksum(ctx, nodePath, storageprovider.XSSHA1, ri) - readChecksumIntoOpaque(ctx, nodePath, storageprovider.XSMD5, ri) - readChecksumIntoOpaque(ctx, nodePath, storageprovider.XSAdler32, ri) + n.readChecksumIntoResourceChecksum(ctx, storageprovider.XSSHA1, ri) + n.readChecksumIntoOpaque(ctx, storageprovider.XSMD5, ri) + n.readChecksumIntoOpaque(ctx, storageprovider.XSAdler32, ri) } // quota // FIXME move to fieldmask if _, ok := mdKeysMap[QuotaKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER) && returnAllMetadata || ok { if n.SpaceRoot != nil && n.SpaceRoot.InternalPath() != "" { - readQuotaIntoOpaque(ctx, n.SpaceRoot.InternalPath(), ri) + n.SpaceRoot.readQuotaIntoOpaque(ctx, ri) } } // only read the requested metadata attributes - attrs, err := xattrs.List(nodePath) + attrs, err := n.Xattrs() if err != nil { sublog.Error().Err(err).Msg("error getting list of extended attributes") } else { - for i := range attrs { + for key, value := range attrs { // filter out non-custom properties - if !strings.HasPrefix(attrs[i], xattrs.MetadataPrefix) { + if !strings.HasPrefix(key, xattrs.MetadataPrefix) { continue } // only read when key was requested - k := attrs[i][len(xattrs.MetadataPrefix):] + k := key[len(xattrs.MetadataPrefix):] if _, ok := mdKeysMap[k]; returnAllMetadata || ok { - if val, err := xattrs.Get(nodePath, attrs[i]); err == nil { - metadata[k] = val - } else { - sublog.Error().Err(err). - Str("entry", attrs[i]). - Msg("error retrieving xattr metadata") - } + metadata[k] = value } } @@ -829,8 +787,8 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi return ri, nil } -func readChecksumIntoResourceChecksum(ctx context.Context, nodePath, algo string, ri *provider.ResourceInfo) { - v, err := xattrs.Get(nodePath, xattrs.ChecksumPrefix+algo) +func (n *Node) readChecksumIntoResourceChecksum(ctx context.Context, algo string, ri *provider.ResourceInfo) { + v, err := n.Xattr(xattrs.ChecksumPrefix + algo) switch { case err == nil: ri.Checksum = &provider.ResourceChecksum{ @@ -838,16 +796,14 @@ func readChecksumIntoResourceChecksum(ctx context.Context, nodePath, algo string Sum: hex.EncodeToString([]byte(v)), } case xattrs.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", nodePath).Str("algorithm", algo).Msg("checksum not set") - case xattrs.IsNotExist(err): - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Str("algorithm", algo).Msg("file not found") + appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Str("algorithm", algo).Msg("could not read checksum") + appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") } } -func readChecksumIntoOpaque(ctx context.Context, nodePath, algo string, ri *provider.ResourceInfo) { - v, err := xattrs.Get(nodePath, xattrs.ChecksumPrefix+algo) +func (n *Node) readChecksumIntoOpaque(ctx context.Context, algo string, ri *provider.ResourceInfo) { + v, err := n.Xattr(xattrs.ChecksumPrefix + algo) switch { case err == nil: if ri.Opaque == nil { @@ -860,17 +816,15 @@ func readChecksumIntoOpaque(ctx context.Context, nodePath, algo string, ri *prov Value: []byte(hex.EncodeToString([]byte(v))), } case xattrs.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", nodePath).Str("algorithm", algo).Msg("checksum not set") - case xattrs.IsNotExist(err): - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Str("algorithm", algo).Msg("file not found") + appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Str("algorithm", algo).Msg("could not read checksum") + appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") } } // quota is always stored on the root node -func readQuotaIntoOpaque(ctx context.Context, nodePath string, ri *provider.ResourceInfo) { - v, err := xattrs.Get(nodePath, xattrs.QuotaAttr) +func (n *Node) readQuotaIntoOpaque(ctx context.Context, ri *provider.ResourceInfo) { + v, err := n.Xattr(xattrs.QuotaAttr) switch { case err == nil: // make sure we have a proper signed int @@ -889,20 +843,18 @@ func readQuotaIntoOpaque(ctx context.Context, nodePath string, ri *provider.Reso Value: []byte(v), } } else { - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Str("quota", v).Msg("malformed quota") + appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("quota", v).Msg("malformed quota") } case xattrs.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", nodePath).Msg("quota not set") - case xattrs.IsNotExist(err): - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Msg("file not found when reading quota") + appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Msg("quota not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", nodePath).Msg("could not read quota") + appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Msg("could not read quota") } } // HasPropagation checks if the propagation attribute exists and is set to "1" func (n *Node) HasPropagation() (propagation bool) { - if b, err := xattrs.Get(n.InternalPath(), xattrs.PropagationAttr); err == nil { + if b, err := n.Xattr(xattrs.PropagationAttr); err == nil { return b == "1" } return false @@ -911,7 +863,7 @@ func (n *Node) HasPropagation() (propagation bool) { // GetTMTime reads the tmtime from the extended attributes func (n *Node) GetTMTime() (tmTime time.Time, err error) { var b string - if b, err = xattrs.Get(n.InternalPath(), xattrs.TreeMTimeAttr); err != nil { + if b, err = n.Xattr(xattrs.TreeMTimeAttr); err != nil { return } return time.Parse(time.RFC3339Nano, b) @@ -920,20 +872,16 @@ func (n *Node) GetTMTime() (tmTime time.Time, err error) { // SetTMTime writes the UTC tmtime to the extended attributes or removes the attribute if nil is passed func (n *Node) SetTMTime(t *time.Time) (err error) { if t == nil { - err = xattrs.Remove(n.InternalPath(), xattrs.TreeMTimeAttr) - if xattrs.IsAttrUnset(err) { - return nil - } - return err + return n.RemoveXattr(xattrs.TreeMTimeAttr) } - return xattrs.Set(n.InternalPath(), xattrs.TreeMTimeAttr, t.UTC().Format(time.RFC3339Nano)) + return n.SetXattr(xattrs.TreeMTimeAttr, t.UTC().Format(time.RFC3339Nano)) } // GetDTime reads the dtime from the extended attributes func (n *Node) GetDTime() (tmTime time.Time, err error) { - var b string - if b, err = xattrs.Get(n.InternalPath(), xattrs.DTimeAttr); err != nil { - return + b, err := n.Xattr(xattrs.DTimeAttr) + if err != nil { + return time.Time{}, err } return time.Parse(time.RFC3339Nano, b) } @@ -941,13 +889,9 @@ func (n *Node) GetDTime() (tmTime time.Time, err error) { // SetDTime writes the UTC dtime to the extended attributes or removes the attribute if nil is passed func (n *Node) SetDTime(t *time.Time) (err error) { if t == nil { - err = xattrs.Remove(n.InternalPath(), xattrs.DTimeAttr) - if xattrs.IsAttrUnset(err) { - return nil - } - return err + return n.RemoveXattr(xattrs.DTimeAttr) } - return xattrs.Set(n.InternalPath(), xattrs.DTimeAttr, t.UTC().Format(time.RFC3339Nano)) + return n.SetXattr(xattrs.DTimeAttr, t.UTC().Format(time.RFC3339Nano)) } // IsDisabled returns true when the node has a dmtime attribute set @@ -963,7 +907,7 @@ func (n *Node) IsDisabled() bool { // GetTreeSize reads the treesize from the extended attributes func (n *Node) GetTreeSize() (treesize uint64, err error) { var b string - if b, err = xattrs.Get(n.InternalPath(), xattrs.TreesizeAttr); err != nil { + if b, err = n.Xattr(xattrs.TreesizeAttr); err != nil { return } return strconv.ParseUint(b, 10, 64) @@ -971,21 +915,17 @@ func (n *Node) GetTreeSize() (treesize uint64, err error) { // SetTreeSize writes the treesize to the extended attributes func (n *Node) SetTreeSize(ts uint64) (err error) { - return n.SetMetadata(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10)) + return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10)) } // SetChecksum writes the checksum with the given checksum type to the extended attributes func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) { - return n.SetMetadata(xattrs.ChecksumPrefix+csType, string(h.Sum(nil))) + return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil))) } // UnsetTempEtag removes the temporary etag attribute func (n *Node) UnsetTempEtag() (err error) { - err = xattrs.Remove(n.InternalPath(), xattrs.TmpEtagAttr) - if xattrs.IsAttrUnset(err) { - return nil - } - return err + return n.RemoveXattr(xattrs.TmpEtagAttr) } // ReadUserPermissions will assemble the permissions for the current user on the given node without parent nodes @@ -1082,14 +1022,14 @@ func (n *Node) IsDenied(ctx context.Context) bool { // We don't want to wast time and memory by creating grantee objects. // The function will return a list of opaque strings that can be used to make a ReadGrant call func (n *Node) ListGrantees(ctx context.Context) (grantees []string, err error) { - attrs, err := xattrs.List(n.InternalPath()) + attrs, err := n.Xattrs() if err != nil { appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Msg("error listing attributes") return nil, err } - for i := range attrs { - if strings.HasPrefix(attrs[i], xattrs.GrantPrefix) { - grantees = append(grantees, attrs[i]) + for name := range attrs { + if strings.HasPrefix(name, xattrs.GrantPrefix) { + grantees = append(grantees, name) } } return @@ -1097,12 +1037,12 @@ func (n *Node) ListGrantees(ctx context.Context) (grantees []string, err error) // ReadGrant reads a CS3 grant func (n *Node) ReadGrant(ctx context.Context, grantee string) (g *provider.Grant, err error) { - var b string - if b, err = xattrs.Get(n.InternalPath(), grantee); err != nil { + xattr, err := n.Xattr(grantee) + if err != nil { return nil, err } var e *ace.ACE - if e, err = ace.Unmarshal(strings.TrimPrefix(grantee, xattrs.GrantPrefix), []byte(b)); err != nil { + if e, err = ace.Unmarshal(strings.TrimPrefix(grantee, xattrs.GrantPrefix), []byte(xattr)); err != nil { return nil, err } return e.Grant(), nil @@ -1199,7 +1139,7 @@ func (n *Node) FindStorageSpaceRoot() error { // remember the node we ask for and use parent to climb the tree parent := n for { - if IsSpaceRoot(parent) { + if parent.IsSpaceRoot() { n.SpaceRoot = parent break } @@ -1211,12 +1151,9 @@ func (n *Node) FindStorageSpaceRoot() error { } // IsSpaceRoot checks if the node is a space root -func IsSpaceRoot(r *Node) bool { - path := r.InternalPath() - if _, err := xattrs.Get(path, xattrs.SpaceNameAttr); err == nil { - return true - } - return false +func (n *Node) IsSpaceRoot() bool { + _, err := n.Xattr(xattrs.SpaceNameAttr) + return err == nil } // CheckQuota checks if both disk space and available quota are sufficient @@ -1229,7 +1166,7 @@ var CheckQuota = func(spaceRoot *Node, overwrite bool, oldSize, newSize uint64) if !enoughDiskSpace(spaceRoot.InternalPath(), newSize) { return false, errtypes.InsufficientStorage("disk full") } - quotaByteStr, _ := xattrs.Get(spaceRoot.InternalPath(), xattrs.QuotaAttr) + quotaByteStr, _ := spaceRoot.Xattr(xattrs.QuotaAttr) if quotaByteStr == "" || quotaByteStr == QuotaUnlimited { // if quota is not set, it means unlimited return true, nil diff --git a/pkg/storage/utils/decomposedfs/node/xattrs.go b/pkg/storage/utils/decomposedfs/node/xattrs.go new file mode 100644 index 0000000000..be65e0d177 --- /dev/null +++ b/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -0,0 +1,84 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package node + +import ( + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/pkg/xattr" +) + +// SetXattrs sets multiple extended attributes on the write-through cache/node +func (n *Node) SetXattrs(attribs map[string]string) (err error) { + if n.xattrsCache != nil { + for k, v := range attribs { + n.xattrsCache[k] = v + } + } + + return xattrs.SetMultiple(n.InternalPath(), attribs) +} + +// SetXattr sets an extended attribute on the write-through cache/node +func (n *Node) SetXattr(key, val string) (err error) { + if n.xattrsCache != nil { + n.xattrsCache[key] = val + } + + return xattrs.Set(n.InternalPath(), key, val) +} + +// RemoveXattr removes an extended attribute from the write-through cache/node +func (n *Node) RemoveXattr(key string) error { + if n.xattrsCache != nil { + delete(n.xattrsCache, key) + } + return xattrs.Remove(n.InternalPath(), key) +} + +// Xattrs returns the extended attributes of the node. If the attributes have already +// been cached they are not read from disk again. +func (n *Node) Xattrs() (map[string]string, error) { + if n.xattrsCache != nil { + return n.xattrsCache, nil + } + + attrs, err := xattrs.All(n.InternalPath()) + if err != nil { + return nil, err + } + n.xattrsCache = attrs + return n.xattrsCache, nil +} + +// Xattr returns an extended attribute of the node. If the attributes have already +// been cached it is not read from disk again. +func (n *Node) Xattr(key string) (string, error) { + if n.xattrsCache == nil { + b, err := xattr.Get(n.InternalPath(), key) + if err != nil { + return "", err + } + return string(b), nil + } + + if val, ok := n.xattrsCache[key]; ok { + return val, nil + } + return "", xattr.ENOATTR +} diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index fcf9ca7508..96678b149e 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -140,7 +140,7 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr metadata[xattrs.SpaceAliasAttr] = alias } - if err := xattrs.SetMultiple(root.InternalPath(), metadata); err != nil { + if err := root.SetXattrs(metadata); err != nil { return nil, err } @@ -598,7 +598,7 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up } } - err = xattrs.SetMultiple(node.InternalPath(), metadata) + err = node.SetXattrs(metadata) if err != nil { return nil, err } @@ -639,7 +639,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De return err } - st, err := n.SpaceRoot.GetMetadata(xattrs.SpaceTypeAttr) + st, err := n.SpaceRoot.Xattr(xattrs.SpaceTypeAttr) if err != nil { return errtypes.InternalError(fmt.Sprintf("space %s does not have a spacetype, possible corrupt decompsedfs", n.ID)) } @@ -667,7 +667,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De return errtypes.NewErrtypeFromStatus(status.NewInvalid(ctx, "can't purge enabled space")) } - spaceType, err := n.GetMetadata(xattrs.SpaceTypeAttr) + spaceType, err := n.Xattr(xattrs.SpaceTypeAttr) if err != nil { return err } @@ -772,7 +772,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, var err error // TODO apply more filters var sname string - if sname, err = n.SpaceRoot.GetMetadata(xattrs.SpaceNameAttr); err != nil { + if sname, err = n.SpaceRoot.Xattr(xattrs.SpaceNameAttr); err != nil { // FIXME: Is that a severe problem? appctx.GetLogger(ctx).Debug().Err(err).Msg("space does not have a name attribute") } @@ -836,7 +836,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, // Mtime is set either as node.tmtime or as fi.mtime below } - if space.SpaceType, err = n.SpaceRoot.GetMetadata(xattrs.SpaceTypeAttr); err != nil { + if space.SpaceType, err = n.SpaceRoot.Xattr(xattrs.SpaceTypeAttr); err != nil { appctx.GetLogger(ctx).Debug().Err(err).Msg("space does not have a type attribute") } @@ -879,7 +879,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, Value: []byte(etag), } - spaceAttributes, err := xattrs.All(n.InternalPath()) + spaceAttributes, err := n.Xattrs() if err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 8c6724d6fb..e1e458832d 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -343,8 +343,6 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) // Always target the old node ID for xattr updates. // The new node id is empty if the target does not exist // and we need to overwrite the new one when overwriting an existing path. - tgtPath := oldNode.InternalPath() - // are we just renaming (parent stays the same)? if oldNode.ParentID == newNode.ParentID { @@ -361,7 +359,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) } // update name attribute - if err := xattrs.Set(tgtPath, xattrs.NameAttr, newNode.Name); err != nil { + if err := oldNode.SetXattr(xattrs.NameAttr, newNode.Name); err != nil { return errors.Wrap(err, "Decomposedfs: could not set name attribute") } @@ -381,10 +379,10 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) } // update target parentid and name - if err := xattrs.Set(tgtPath, xattrs.ParentidAttr, newNode.ParentID); err != nil { + if err := oldNode.SetXattr(xattrs.ParentidAttr, newNode.ParentID); err != nil { return errors.Wrap(err, "Decomposedfs: could not set parentid attribute") } - if err := xattrs.Set(tgtPath, xattrs.NameAttr, newNode.Name); err != nil { + if err := oldNode.SetXattr(xattrs.NameAttr, newNode.Name); err != nil { return errors.Wrap(err, "Decomposedfs: could not set name attribute") } @@ -470,7 +468,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { // set origin location in metadata nodePath := n.InternalPath() - if err := n.SetMetadata(xattrs.TrashOriginAttr, origin); err != nil { + if err := n.SetXattr(xattrs.TrashOriginAttr, origin); err != nil { return err } @@ -480,7 +478,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { trashLink := filepath.Join(t.root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil { // Roll back changes - _ = n.RemoveMetadata(xattrs.TrashOriginAttr) + _ = n.RemoveXattr(xattrs.TrashOriginAttr) return err } @@ -493,7 +491,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { err = os.Symlink("../../../../../nodes/"+lookup.Pathify(n.ID, 4, 2)+node.TrashIDDelimiter+deletionTime, trashLink) if err != nil { // Roll back changes - _ = n.RemoveMetadata(xattrs.TrashOriginAttr) + _ = n.RemoveXattr(xattrs.TrashOriginAttr) return } @@ -506,7 +504,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { // To roll back changes // TODO remove symlink // Roll back changes - _ = n.RemoveMetadata(xattrs.TrashOriginAttr) + _ = n.RemoveXattr(xattrs.TrashOriginAttr) return } @@ -521,7 +519,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { // TODO revert the rename // TODO remove symlink // Roll back changes - _ = n.RemoveMetadata(xattrs.TrashOriginAttr) + _ = n.RemoveXattr(xattrs.TrashOriginAttr) return } @@ -580,13 +578,13 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa targetNode.Exists = true // update name attribute - if err := recycleNode.SetMetadata(xattrs.NameAttr, targetNode.Name); err != nil { + if err := recycleNode.SetXattr(xattrs.NameAttr, targetNode.Name); err != nil { return errors.Wrap(err, "Decomposedfs: could not set name attribute") } // set ParentidAttr to restorePath's node parent id if trashPath != "" { - if err := recycleNode.SetMetadata(xattrs.ParentidAttr, targetNode.ParentID); err != nil { + if err := recycleNode.SetXattr(xattrs.ParentidAttr, targetNode.ParentID); err != nil { return errors.Wrap(err, "Decomposedfs: could not set name attribute") } } diff --git a/pkg/storage/utils/decomposedfs/tree/tree_test.go b/pkg/storage/utils/decomposedfs/tree/tree_test.go index 2505002328..b8a97eea09 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree_test.go +++ b/pkg/storage/utils/decomposedfs/tree/tree_test.go @@ -311,6 +311,12 @@ var _ = Describe("Tree", func() { err = env.Tree.Propagate(env.Ctx, file) Expect(err).ToNot(HaveOccurred()) + dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ + StorageId: dir.SpaceID, + SpaceId: dir.SpaceID, + OpaqueId: dir.ID, + }) + Expect(err).ToNot(HaveOccurred()) riAfter, err := dir.AsResourceInfo(env.Ctx, &perms, []string{}, []string{}, false) Expect(err).ToNot(HaveOccurred()) Expect(riAfter.Etag).ToNot(Equal(riBefore.Etag)) @@ -324,6 +330,13 @@ var _ = Describe("Tree", func() { err = env.Tree.Propagate(env.Ctx, file) Expect(err).ToNot(HaveOccurred()) + + dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ + StorageId: dir.SpaceID, + SpaceId: dir.SpaceID, + OpaqueId: dir.ID, + }) + Expect(err).ToNot(HaveOccurred()) size, err := dir.GetTreeSize() Expect(err).ToNot(HaveOccurred()) Expect(size).To(Equal(uint64(1))) @@ -337,6 +350,13 @@ var _ = Describe("Tree", func() { err = env.Tree.Propagate(env.Ctx, file2) Expect(err).ToNot(HaveOccurred()) + + dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ + StorageId: dir.SpaceID, + SpaceId: dir.SpaceID, + OpaqueId: dir.ID, + }) + Expect(err).ToNot(HaveOccurred()) size, err := dir.GetTreeSize() Expect(err).ToNot(HaveOccurred()) Expect(size).To(Equal(uint64(101))) @@ -353,6 +373,13 @@ var _ = Describe("Tree", func() { err = env.Tree.Propagate(env.Ctx, file) Expect(err).ToNot(HaveOccurred()) + + dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ + StorageId: dir.SpaceID, + SpaceId: dir.SpaceID, + OpaqueId: dir.ID, + }) + Expect(err).ToNot(HaveOccurred()) size, err := dir.GetTreeSize() Expect(err).ToNot(HaveOccurred()) Expect(size).To(Equal(uint64(201))) @@ -366,6 +393,13 @@ var _ = Describe("Tree", func() { err = env.Tree.Propagate(env.Ctx, subdir) Expect(err).ToNot(HaveOccurred()) + + dir, err := env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ + StorageId: dir.SpaceID, + SpaceId: dir.SpaceID, + OpaqueId: dir.ID, + }) + Expect(err).ToNot(HaveOccurred()) size, err := dir.GetTreeSize() Expect(size).To(Equal(uint64(200))) Expect(err).ToNot(HaveOccurred()) @@ -381,6 +415,12 @@ var _ = Describe("Tree", func() { err = env.Tree.Propagate(env.Ctx, otherdir) Expect(err).ToNot(HaveOccurred()) + dir, err = env.Lookup.NodeFromID(env.Ctx, &provider.ResourceId{ + StorageId: dir.SpaceID, + SpaceId: dir.SpaceID, + OpaqueId: dir.ID, + }) + Expect(err).ToNot(HaveOccurred()) size, err = dir.GetTreeSize() Expect(err).ToNot(HaveOccurred()) Expect(size).To(Equal(uint64(200))) diff --git a/pkg/storage/utils/decomposedfs/xattrs/errors.go b/pkg/storage/utils/decomposedfs/xattrs/errors.go index 4733ce53b9..b967b6099d 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/errors.go +++ b/pkg/storage/utils/decomposedfs/xattrs/errors.go @@ -19,6 +19,7 @@ package xattrs import ( + "os" "syscall" "github.com/pkg/errors" @@ -28,6 +29,9 @@ import ( // IsNotExist checks if there is a os not exists error buried inside the xattr error, // as we cannot just use os.IsNotExist(). func IsNotExist(err error) bool { + if os.IsNotExist(errors.Cause(err)) { + return true + } if xerr, ok := errors.Cause(err).(*xattr.Error); ok { if serr, ok2 := xerr.Err.(syscall.Errno); ok2 { return serr == syscall.ENOENT diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index 9911f0d6fa..f338f881f5 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -85,6 +85,10 @@ func acquireLock(file string, write bool) (*flock.Flock, error) { return nil, ErrPathEmpty } + if _, err = os.Stat(file); err != nil { + return nil, err + } + var flock *flock.Flock for i := 1; i <= _lockCyclesValue; i++ { if flock = getMutexedFlock(n); flock != nil {