Skip to content

Commit

Permalink
implement filter & refactor spacetypes handling
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Richter <crichter@owncloud.com>
  • Loading branch information
dragonchaser committed Jul 8, 2022
1 parent 06302f7 commit cd399c4
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 86 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/add-user-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add user filter

This PR adds the ability to filter spaces by user-id

https://github.com/owncloud/ocis/pull/4072
2 changes: 2 additions & 0 deletions pkg/storage/fs/owncloudsql/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (fs *owncloudsqlfs) ListStorageSpaces(ctx context.Context, filter []*provid
filteringUnsupportedSpaceTypes = (t != "personal" && !strings.HasPrefix(t, "+"))
case provider.ListStorageSpacesRequest_Filter_TYPE_ID:
spaceID, _, _ = storagespace.SplitID(filter[i].GetId().OpaqueId)
case provider.ListStorageSpacesRequest_Filter_TYPE_USER:
spaceID, _, _ = storagespace.SplitID(filter[i].GetId().OpaqueId)
}
}
if filteringUnsupportedSpaceTypes {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/registry/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (r *registry) buildFilters(filterMap map[string]string) []*providerpb.ListS
})
}
}
if filterMap["user_id"] != "" && filterMap["user_idp"] != "" {
if filterMap["user_id"] != "" {
filters = append(filters, &providerpb.ListStorageSpacesRequest_Filter{
Type: providerpb.ListStorageSpacesRequest_Filter_TYPE_USER,
Term: &providerpb.ListStorageSpacesRequest_Filter_User{
Expand Down
126 changes: 76 additions & 50 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
}
}

space, err := fs.storageSpaceFromNode(ctx, root, root.InternalPath(), false, false)
space, err := fs.storageSpaceFromNode(ctx, root, false, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -215,21 +215,17 @@ func (fs *Decomposedfs) canCreateSpace(ctx context.Context, spaceID string) bool
return checkRes.Status.Code == v1beta11.Code_CODE_OK
}

// ReadSpaceAndNodeFromSpaceTypeLink reads a symlink and parses space and node id if the link has the correct format, eg:
// ReadSpaceAndNodeFromIndexLink reads a symlink and parses space and node id if the link has the correct format, eg:
// ../../spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51
// ../../spaces/4c/510ada-c86b-4815-8820-42cdf82c3d51/nodes/4c/51/0a/da/-c86b-4815-8820-42cdf82c3d51.T.2022-02-24T12:35:18.196484592Z
func ReadSpaceAndNodeFromSpaceTypeLink(path string) (string, string, error) {
link, err := os.Readlink(path)
if err != nil {
return "", "", err
}
// ../../spaces/sp/ace-id/nodes/sh/or/tn/od/eid
// 0 1 2 3 4 5 6 7 8 9 10
func ReadSpaceAndNodeFromIndexLink(link string) (string, string, error) {
// ../../../spaces/sp/ace-id/nodes/sh/or/tn/od/eid
// 0 1 2 3 4 5 6 7 8 9 10 11
parts := strings.Split(link, string(filepath.Separator))
if len(parts) != 11 || parts[0] != ".." || parts[1] != ".." || parts[2] != "spaces" || parts[5] != "nodes" {
if len(parts) != 12 || parts[0] != ".." || parts[1] != ".." || parts[2] != ".." || parts[3] != "spaces" || parts[6] != "nodes" {
return "", "", errtypes.InternalError("malformed link")
}
return strings.Join(parts[3:5], ""), strings.Join(parts[6:11], ""), nil
return strings.Join(parts[4:6], ""), strings.Join(parts[7:12], ""), nil
}

// ListStorageSpaces returns a list of StorageSpaces.
Expand All @@ -250,10 +246,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
var (
spaceID = spaceIDAny
nodeID = spaceIDAny
userID = ctxpkg.ContextMustGetUser(ctx).GetId().GetOpaqueId()
userID = spaceIDAny
)

spaceTypes := []string{}
spaceTypes := map[string]struct{}{}

for i := range filter {
switch filter[i].Type {
Expand All @@ -264,25 +260,28 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
case "+grant":
// TODO include grants
default:
spaceTypes = append(spaceTypes, filter[i].GetSpaceType())
spaceTypes[filter[i].GetSpaceType()] = struct{}{}
}
case provider.ListStorageSpacesRequest_Filter_TYPE_ID:
spaceID, nodeID, _ = storagespace.SplitID(filter[i].GetId().OpaqueId)
if strings.Contains(nodeID, "/") {
return []*provider.StorageSpace{}, nil
}

case provider.ListStorageSpacesRequest_Filter_TYPE_USER:
// TODO: refactor this to GetUserId() in cs3
userID = filter[i].GetUser()
userID = filter[i].GetUser().GetOpaqueId()
}
}
if len(spaceTypes) == 0 {
spaceTypes = []string{spaceTypeAny}
spaceTypes[spaceTypeAny] = struct{}{}
}

canListAllSpaces := fs.canListAllSpaces(ctx)

if userID != spaceTypeAny && !canListAllSpaces {
return nil, errtypes.PermissionDenied(fmt.Sprintf("user %s is not allowed to list spaces of other users", ctxpkg.ContextMustGetUser(ctx).GetId().GetOpaqueId()))
}

spaces := []*provider.StorageSpace{}
// build the glob path, eg.
// /path/to/root/spaces/{spaceType}/{spaceId}
Expand All @@ -300,28 +299,52 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// return empty list
return spaces, nil
}
space, err := fs.storageSpaceFromNode(ctx, n, n.InternalPath(), canListAllSpaces, unrestricted)
space, err := fs.storageSpaceFromNode(ctx, n, canListAllSpaces, unrestricted)
if err != nil {
return nil, err
}
// filter space types
for _, spaceType := range spaceTypes {
if spaceType == spaceTypeAny || spaceType == space.SpaceType {
spaces = append(spaces, space)
}
_, ok1 := spaceTypes[spaceTypeAny]
_, ok2 := spaceTypes[space.SpaceType]
if ok1 || ok2 {
spaces = append(spaces, space)
}

// TODO: filter user id
return spaces, nil
}

matches := []string{}
for _, spaceType := range spaceTypes {
path := filepath.Join(fs.o.Root, "spacetypes", spaceType, nodeID)
matches := map[string]struct{}{}

if userID != spaceTypeAny {
path := filepath.Join(fs.o.Root, "indexes", "by-user-id", userID, nodeID)
m, err := filepath.Glob(path)
if err != nil {
return nil, err
}
matches = append(matches, m...)
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
matches[link] = struct{}{}
}
}

if userID == spaceTypeAny {
for spaceType := range spaceTypes {
path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, nodeID)
m, err := filepath.Glob(path)
if err != nil {
return nil, err
}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
matches[link] = struct{}{}
}
}
}

// FIXME if the space does not exist try a node as the space root.
Expand All @@ -337,16 +360,16 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide

numShares := 0

for i := range matches {
for match := range matches {
var err error
// do not investigate flock files any further. They indicate file locks but are not relevant here.
if strings.HasSuffix(matches[i], ".flock") {
if strings.HasSuffix(match, ".flock") {
continue
}
// always read link in case storage space id != node id
spaceID, nodeID, err = ReadSpaceAndNodeFromSpaceTypeLink(matches[i])
spaceID, nodeID, err = ReadSpaceAndNodeFromIndexLink(match)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("match", matches[i]).Msg("could not read link, skipping")
appctx.GetLogger(ctx).Error().Err(err).Str("match", match).Msg("could not read link, skipping")
continue
}

Expand All @@ -360,25 +383,27 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
continue
}

spaceType := filepath.Base(filepath.Dir(matches[i]))
space, err := fs.storageSpaceFromNode(ctx, n, canListAllSpaces, unrestricted)
if err != nil {
if _, ok := err.(errtypes.IsPermissionDenied); !ok {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not convert to storage space")
}
continue
}

// FIXME type share evolved to grant on the edge branch ... make it configurable if the driver should support them or not for now ... ignore type share
if spaceType == spaceTypeShare {
if space.SpaceType == spaceTypeShare {
numShares++
// do not list shares as spaces for the owner
continue
}

// TODO apply more filters
space, err := fs.storageSpaceFromNode(ctx, n, matches[i], canListAllSpaces, unrestricted)
if err != nil {
if _, ok := err.(errtypes.IsPermissionDenied); !ok {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not convert to storage space")
}
continue
_, ok1 := spaceTypes[spaceTypeAny]
_, ok2 := spaceTypes[space.SpaceType]
if ok1 || ok2 {
spaces = append(spaces, space)
}
spaces = append(spaces, space)

}
// if there are no matches (or they happened to be spaces for the owner) and the node is a child return a space
if len(matches) <= numShares && nodeID != spaceID {
Expand All @@ -388,7 +413,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
return nil, err
}
if n.Exists {
space, err := fs.storageSpaceFromNode(ctx, n, n.InternalPath(), canListAllSpaces, unrestricted)
space, err := fs.storageSpaceFromNode(ctx, n, canListAllSpaces, unrestricted)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -496,7 +521,7 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
}

// send back the updated data from the storage
updatedSpace, err := fs.storageSpaceFromNode(ctx, node, node.InternalPath(), false, false)
updatedSpace, err := fs.storageSpaceFromNode(ctx, node, false, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -570,19 +595,20 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str
return nil
}
// create user index dir
// TODO: pathify userID
if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-user-id", userID), 0700); err != nil {
return err
}

err := os.Symlink("../../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "indexes/by-user-id", userID, spaceID))
if err != nil {
if isAlreadyExists(err) {
appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("indexes/by-user-id", userID).Msg("symlink already exists")
appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("user-id", userID).Msg("symlink already exists")
// FIXME: is it ok to wipe this err if the symlink already exists?
err = nil
} else {
// TODO how should we handle error cases here?
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("indexes/by-user-id", userID).Msg("could not create symlink")
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("user-id", userID).Msg("could not create symlink")
}
}
return nil
Expand All @@ -595,12 +621,12 @@ func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType stri
return nil
}
// create space type dir
if err := os.MkdirAll(filepath.Join(fs.o.Root, "spacetypes", spaceType), 0700); err != nil {
if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-type", spaceType), 0700); err != nil {
return err
}

// link space in spacetypes
err := os.Symlink("../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "spacetypes", spaceType, spaceID))
err := os.Symlink("../../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "spacetypes", spaceType, spaceID))
if err != nil {
if isAlreadyExists(err) {
appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("spacetype", spaceType).Msg("symlink already exists")
Expand All @@ -615,7 +641,7 @@ func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType stri
return err
}

func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, nodePath string, canListAllSpaces bool, unrestricted bool) (*provider.StorageSpace, error) {
func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, canListAllSpaces bool, unrestricted bool) (*provider.StorageSpace, error) {
user := ctxpkg.ContextMustGetUser(ctx)
if !canListAllSpaces || !unrestricted {
ok, err := node.NewPermissions(fs.lu).HasPermission(ctx, n, func(p *provider.ResourcePermissions) bool {
Expand Down Expand Up @@ -723,7 +749,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
Seconds: uint64(un / 1000000000),
Nanos: uint32(un % 1000000000),
}
} else if fi, err := os.Stat(nodePath); err == nil {
} else if fi, err := os.Stat(n.InternalPath()); err == nil {
// fall back to stat mtime
tmtime = fi.ModTime()
un := fi.ModTime().UnixNano()
Expand All @@ -742,7 +768,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
Value: []byte(etag),
}

spaceAttributes, err := xattrs.All(nodePath)
spaceAttributes, err := xattrs.All(n.InternalPath())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/spaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = Describe("Spaces", func() {
err := os.Symlink(link, filepath.Join(tmpdir, "link"))
Expect(err).ToNot(HaveOccurred())

space, node, err := decomposedfs.ReadSpaceAndNodeFromSpaceTypeLink(filepath.Join(tmpdir, "link"))
space, node, err := decomposedfs.ReadSpaceAndNodeFromIndexLink(filepath.Join(tmpdir, "link"))
if shouldErr {
Expect(err).To(HaveOccurred())
} else {
Expand Down
Loading

0 comments on commit cd399c4

Please sign in to comment.