From 8185929dd0c30085590fe6e5566d98b4e6ccdee8 Mon Sep 17 00:00:00 2001 From: Christian Richter Date: Wed, 29 Nov 2023 14:32:29 +0100 Subject: [PATCH 1/5] Handle tashbin file listing concurrently MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: André Duffeck Signed-off-by: Christian Richter --- pkg/storage/utils/decomposedfs/recycle.go | 135 ++++++++++++++-------- 1 file changed, 88 insertions(+), 47 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/recycle.go b/pkg/storage/utils/decomposedfs/recycle.go index eeb153777a..517914eeca 100644 --- a/pkg/storage/utils/decomposedfs/recycle.go +++ b/pkg/storage/utils/decomposedfs/recycle.go @@ -20,6 +20,7 @@ package decomposedfs import ( "context" + "golang.org/x/sync/errgroup" iofs "io/fs" "os" "path/filepath" @@ -214,7 +215,6 @@ func readTrashLink(path string) (string, string, string, error) { func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) { log := appctx.GetLogger(ctx) - items := make([]*provider.RecycleItem, 0) trashRoot := fs.getRecycleRoot(spaceID) matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*") @@ -222,58 +222,99 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p return nil, err } - for _, itemPath := range matches { - nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") - continue - } + numWorkers := fs.o.MaxConcurrency + if len(matches) < numWorkers { + numWorkers = len(matches) + } - md, err := os.Stat(nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") - continue - } + work := make(chan string, len(matches)) + results := make(chan *provider.RecycleItem, len(matches)) - attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") - continue - } + g, ctx := errgroup.WithContext(ctx) - nodeType := fs.lu.TypeFromPath(ctx, nodePath) - if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") - continue - } - - item := &provider.RecycleItem{ - Type: nodeType, - Size: uint64(md.Size()), - Key: nodeID, - } - if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { - item.DeletionTime = &types.Timestamp{ - Seconds: uint64(deletionTime.Unix()), - // TODO nanos + // Distribute work + g.Go(func() error { + defer close(work) + for _, itemPath := range matches { + select { + case work <- itemPath: + case <-ctx.Done(): + return ctx.Err() } - } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for itemPath := range work { + nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") + continue + } + + md, err := os.Stat(nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") + continue + } + + attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") + continue + } + + nodeType := fs.lu.TypeFromPath(ctx, nodePath) + if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") + continue + } + + item := &provider.RecycleItem{ + Type: nodeType, + Size: uint64(md.Size()), + Key: nodeID, + } + if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { + item.DeletionTime = &types.Timestamp{ + Seconds: uint64(deletionTime.Unix()), + // TODO nanos + } + } else { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + } + + // lookup origin path in extended attributes + if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { + item.Ref = &provider.Reference{Path: string(attr)} + } else { + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + } + select { + case results <- item: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } - // lookup origin path in extended attributes - if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { - item.Ref = &provider.Reference{Path: string(attr)} - } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping") - continue - } - // TODO filter results by permission ... on the original parent? or the trashed node? - // if it were on the original parent it would be possible to see files that were trashed before the current user got access - // so -> check the trash node itself - // hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items. - // for now we can only really check if the current user is the owner - items = append(items, item) + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + // Collect results + items := make([]*provider.RecycleItem, len(matches)) + i := 0 + for ri := range results { + items[i] = ri + i++ } return items, nil } From ee77f7b76e1750e639d93920bd5d50b1e010cd8f Mon Sep 17 00:00:00 2001 From: Christian Richter Date: Wed, 29 Nov 2023 14:36:06 +0100 Subject: [PATCH 2/5] Add changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: André Duffeck Signed-off-by: Christian Richter --- changelog/unreleased/concurrent-trashbin.md | 7 +++++++ pkg/storage/utils/decomposedfs/recycle.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 changelog/unreleased/concurrent-trashbin.md diff --git a/changelog/unreleased/concurrent-trashbin.md b/changelog/unreleased/concurrent-trashbin.md new file mode 100644 index 0000000000..fda24531e5 --- /dev/null +++ b/changelog/unreleased/concurrent-trashbin.md @@ -0,0 +1,7 @@ +Enhancement: Handle trashbin file listings concurrently + +We now use a concurrent walker to list files in the trashbin. This +improves performance when listing files in the trashbin. + +https://github.com/cs3org/reva/pull/4374 +https://github.com/owncloud/ocis/issues/7844 \ No newline at end of file diff --git a/pkg/storage/utils/decomposedfs/recycle.go b/pkg/storage/utils/decomposedfs/recycle.go index 517914eeca..77ceb91cb3 100644 --- a/pkg/storage/utils/decomposedfs/recycle.go +++ b/pkg/storage/utils/decomposedfs/recycle.go @@ -20,7 +20,6 @@ package decomposedfs import ( "context" - "golang.org/x/sync/errgroup" iofs "io/fs" "os" "path/filepath" @@ -37,6 +36,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) // Recycle items are stored inside the node folder and start with the uuid of the deleted node. From f18ddee2bb7f3ad704216f0b871eafda3ae56797 Mon Sep 17 00:00:00 2001 From: Christian Richter Date: Wed, 29 Nov 2023 15:00:28 +0100 Subject: [PATCH 3/5] Fix test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: André Duffeck Signed-off-by: Christian Richter --- pkg/storage/utils/decomposedfs/recycle_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/recycle_test.go b/pkg/storage/utils/decomposedfs/recycle_test.go index f457992ca5..172ba12bbe 100644 --- a/pkg/storage/utils/decomposedfs/recycle_test.go +++ b/pkg/storage/utils/decomposedfs/recycle_test.go @@ -170,12 +170,10 @@ var _ = Describe("Recycle", func() { itemsA, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) Expect(len(itemsA)).To(Equal(2)) - itemsB, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/") Expect(err).ToNot(HaveOccurred()) Expect(len(itemsB)).To(Equal(2)) - - Expect(itemsA).To(Equal(itemsB)) + Expect(itemsA).To(ConsistOf(itemsB)) }) It("they can be permanently deleted by the other user", func() { From f3d8a24e1c0eb3aa82e14247227287420c2092d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 30 Nov 2023 09:05:48 +0100 Subject: [PATCH 4/5] Implemented concurrent processing for recycling sub-trees MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christian Richter Signed-off-by: André Duffeck --- pkg/storage/utils/decomposedfs/recycle.go | 114 ++++++++++++---------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/recycle.go b/pkg/storage/utils/decomposedfs/recycle.go index 77ceb91cb3..72c2b22640 100644 --- a/pkg/storage/utils/decomposedfs/recycle.go +++ b/pkg/storage/utils/decomposedfs/recycle.go @@ -27,6 +27,9 @@ import ( "strings" "time" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" @@ -35,8 +38,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) // Recycle items are stored inside the node folder and start with the uuid of the deleted node. @@ -215,27 +216,27 @@ func readTrashLink(path string) (string, string, string, error) { func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) { log := appctx.GetLogger(ctx) - trashRoot := fs.getRecycleRoot(spaceID) - matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*") + + subTrees, err := filepath.Glob(trashRoot + "/*") if err != nil { return nil, err } numWorkers := fs.o.MaxConcurrency - if len(matches) < numWorkers { - numWorkers = len(matches) + if len(subTrees) < numWorkers { + numWorkers = len(subTrees) } - work := make(chan string, len(matches)) - results := make(chan *provider.RecycleItem, len(matches)) + work := make(chan string, len(subTrees)) + results := make(chan *provider.RecycleItem, len(subTrees)) g, ctx := errgroup.WithContext(ctx) // Distribute work g.Go(func() error { defer close(work) - for _, itemPath := range matches { + for _, itemPath := range subTrees { select { case work <- itemPath: case <-ctx.Done(): @@ -248,55 +249,62 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p // Spawn workers that'll concurrently work the queue for i := 0; i < numWorkers; i++ { g.Go(func() error { - for itemPath := range work { - nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + for subTree := range work { + matches, err := filepath.Glob(subTree + "/*/*/*/*") if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") - continue + return err } - md, err := os.Stat(nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") - continue - } + for _, itemPath := range matches { + nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping") + continue + } - attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) - if err != nil { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") - continue - } + md, err := os.Stat(nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping") + continue + } - nodeType := fs.lu.TypeFromPath(ctx, nodePath) - if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") - continue - } + attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath) + if err != nil { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping") + continue + } - item := &provider.RecycleItem{ - Type: nodeType, - Size: uint64(md.Size()), - Key: nodeID, - } - if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { - item.DeletionTime = &types.Timestamp{ - Seconds: uint64(deletionTime.Unix()), - // TODO nanos + nodeType := fs.lu.TypeFromPath(ctx, nodePath) + if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping") + continue } - } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") - } - // lookup origin path in extended attributes - if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { - item.Ref = &provider.Reference{Path: string(attr)} - } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") - } - select { - case results <- item: - case <-ctx.Done(): - return ctx.Err() + item := &provider.RecycleItem{ + Type: nodeType, + Size: uint64(md.Size()), + Key: nodeID, + } + if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil { + item.DeletionTime = &types.Timestamp{ + Seconds: uint64(deletionTime.Unix()), + // TODO nanos + } + } else { + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + } + + // lookup origin path in extended attributes + if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { + item.Ref = &provider.Reference{Path: string(attr)} + } else { + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + } + select { + case results <- item: + case <-ctx.Done(): + return ctx.Err() + } } } return nil @@ -310,11 +318,9 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p }() // Collect results - items := make([]*provider.RecycleItem, len(matches)) - i := 0 + items := []*provider.RecycleItem{} for ri := range results { - items[i] = ri - i++ + items = append(items, ri) } return items, nil } From 432001012d8bd039e6b0d74012e3c480d8bbaa36 Mon Sep 17 00:00:00 2001 From: Christian Richter Date: Thu, 30 Nov 2023 11:15:41 +0100 Subject: [PATCH 5/5] update changelog Signed-off-by: Christian Richter --- changelog/unreleased/concurrent-trashbin.md | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog/unreleased/concurrent-trashbin.md b/changelog/unreleased/concurrent-trashbin.md index fda24531e5..ac732411c9 100644 --- a/changelog/unreleased/concurrent-trashbin.md +++ b/changelog/unreleased/concurrent-trashbin.md @@ -3,5 +3,6 @@ Enhancement: Handle trashbin file listings concurrently We now use a concurrent walker to list files in the trashbin. This improves performance when listing files in the trashbin. +https://github.com/cs3org/reva/pull/4377 https://github.com/cs3org/reva/pull/4374 https://github.com/owncloud/ocis/issues/7844 \ No newline at end of file