diff --git a/changelog/unreleased/concurrent-trashbin.md b/changelog/unreleased/concurrent-trashbin.md new file mode 100644 index 0000000000..ac732411c9 --- /dev/null +++ b/changelog/unreleased/concurrent-trashbin.md @@ -0,0 +1,8 @@ +Enhancement: Handle trashbin file listings concurrently + +We now use a concurrent walker to list files in the trashbin. This +improves performance when listing files in the trashbin. + +https://github.com/cs3org/reva/pull/4377 +https://github.com/cs3org/reva/pull/4374 +https://github.com/owncloud/ocis/issues/7844 \ No newline at end of file diff --git a/pkg/storage/utils/decomposedfs/recycle.go b/pkg/storage/utils/decomposedfs/recycle.go index eeb153777a..72c2b22640 100644 --- a/pkg/storage/utils/decomposedfs/recycle.go +++ b/pkg/storage/utils/decomposedfs/recycle.go @@ -27,6 +27,9 @@ import ( "strings" "time" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" @@ -35,7 +38,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/pkg/errors" ) // Recycle items are stored inside the node folder and start with the uuid of the deleted node. @@ -214,66 +216,111 @@ func readTrashLink(path string) (string, string, string, error) { func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) { log := appctx.GetLogger(ctx) - items := make([]*provider.RecycleItem, 0) - trashRoot := fs.getRecycleRoot(spaceID) - matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*") + + subTrees, err := filepath.Glob(trashRoot + "/*") if err != nil { return nil, err } - for _, itemPath := range matches { - nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") - continue - } + numWorkers := fs.o.MaxConcurrency + if len(subTrees) < numWorkers { + numWorkers = len(subTrees) + } - md, err := os.Stat(nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") - continue - } + work := make(chan string, len(subTrees)) + results := make(chan *provider.RecycleItem, len(subTrees)) - attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") - continue - } + g, ctx := errgroup.WithContext(ctx) - nodeType := fs.lu.TypeFromPath(ctx, nodePath) - if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") - continue - } - - item := &provider.RecycleItem{ - Type: nodeType, - Size: uint64(md.Size()), - Key: nodeID, - } - if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { - item.DeletionTime = &types.Timestamp{ - Seconds: uint64(deletionTime.Unix()), - // TODO nanos + // Distribute work + g.Go(func() error { + defer close(work) + for _, itemPath := range subTrees { + select { + case work <- itemPath: + case <-ctx.Done(): + return ctx.Err() } - } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for subTree := range work { + matches, err := filepath.Glob(subTree + "/*/*/*/*") + if err != nil { + return err + } + + for _, itemPath := range matches { + nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") + continue + } + + md, err := os.Stat(nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") + continue + } + + attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") + continue + } + + nodeType := fs.lu.TypeFromPath(ctx, nodePath) + if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") + continue + } + + item := &provider.RecycleItem{ + Type: nodeType, + Size: uint64(md.Size()), + Key: nodeID, + } + if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { + item.DeletionTime = &types.Timestamp{ + Seconds: uint64(deletionTime.Unix()), + // TODO nanos + } + } else { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + } + + // lookup origin path in extended attributes + if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { + item.Ref = &provider.Reference{Path: string(attr)} + } else { + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + } + select { + case results <- item: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil + }) + } - // lookup origin path in extended attributes - if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { - item.Ref = &provider.Reference{Path: string(attr)} - } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping") - continue - } - // TODO filter results by permission ... on the original parent? or the trashed node? - // if it were on the original parent it would be possible to see files that were trashed before the current user got access - // so -> check the trash node itself - // hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items. - // for now we can only really check if the current user is the owner - items = append(items, item) + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + // Collect results + items := []*provider.RecycleItem{} + for ri := range results { + items = append(items, ri) } return items, nil } diff --git a/pkg/storage/utils/decomposedfs/recycle_test.go b/pkg/storage/utils/decomposedfs/recycle_test.go index f457992ca5..172ba12bbe 100644 --- a/pkg/storage/utils/decomposedfs/recycle_test.go +++ b/pkg/storage/utils/decomposedfs/recycle_test.go @@ -170,12 +170,10 @@ var _ = Describe("Recycle", func() { itemsA, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) Expect(len(itemsA)).To(Equal(2)) - itemsB, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) Expect(len(itemsB)).To(Equal(2)) - - Expect(itemsA).To(Equal(itemsB)) + Expect(itemsA).To(ConsistOf(itemsB)) }) It("they can be permanently deleted by the other user", func() {