Skip to content

Commit

Permalink
Limit the number of concurrent goroutines for listing directories (#3740
Browse files Browse the repository at this point in the history
)

* Limit the number of concurrent goroutines for listing directories

* Adapt tests

* Fix linter issues

* Add changelog
  • Loading branch information
aduffeck authored Mar 21, 2023
1 parent d521bbf commit 8c07e9c
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 60 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/limit-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Limit concurrency in decomposedfs

The number of concurrent goroutines used for listing directories in decomposedfs are now limited to a configurable number.

https://github.com/cs3org/reva/pull/3740
74 changes: 57 additions & 17 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs)
tp := tree.New(lu, bs, o)

permissionsClient, err := pool.GetPermissionsClient(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode))
if err != nil {
Expand Down Expand Up @@ -782,29 +782,69 @@ func (fs *Decomposedfs) ListFolder(ctx context.Context, ref *provider.Reference,
return nil, errtypes.NotFound(f)
}

var children []*node.Node
children, err = fs.tp.ListFolder(ctx, n)
children, err := fs.tp.ListFolder(ctx, n)
if err != nil {
return nil, err
}
finfos := make([]*provider.ResourceInfo, len(children))
eg, ctx := errgroup.WithContext(ctx)
for i := range children {
pos := i
eg.Go(func() error {
np := rp
// add this childs permissions
pset, _ := n.PermissionSet(ctx)
node.AddPermissions(&np, &pset)
ri, err := children[pos].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref))
if err != nil {
return errtypes.InternalError(err.Error())

numWorkers := fs.o.MaxConcurrency
if len(children) < numWorkers {
numWorkers = len(children)
}
work := make(chan *node.Node, len(children))
results := make(chan *provider.ResourceInfo, len(children))

g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, child := range children {
select {
case work <- child:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for child := range work {
np := rp
// add this childs permissions
pset, _ := n.PermissionSet(ctx)
node.AddPermissions(&np, &pset)
ri, err := child.AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref))
if err != nil {
return errtypes.InternalError(err.Error())
}
select {
case results <- ri:
case <-ctx.Done():
return ctx.Err()
}
}
finfos[pos] = ri
return nil
})
}
if err := eg.Wait(); err != nil {

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

finfos := make([]*provider.ResourceInfo, len(children))
i := 0
for fi := range results {
finfos[i] = fi
i++
}

if err := g.Wait(); err != nil {
return nil, err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/decomposedfs/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Options struct {

MaxAcquireLockCycles int `mapstructure:"max_acquire_lock_cycles"`
LockCycleDurationFactor int `mapstructure:"lock_cycle_duration_factor"`
MaxConcurrency int `mapstructure:"max_concurrency"`

MaxQuota uint64 `mapstructure:"max_quota"`
}
Expand Down Expand Up @@ -140,5 +141,9 @@ func New(m map[string]interface{}) (*Options, error) {
}
}

if o.MaxConcurrency <= 0 {
o.MaxConcurrency = 100
}

return o, nil
}
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
permissions := &mocks.PermissionsChecker{}
cs3permissionsclient := &mocks.CS3PermissionsClient{}
bs := &treemocks.Blobstore{}
tree := tree.New(o.Root, true, true, lu, bs)
tree := tree.New(lu, bs, o)
fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, cs3permissionsclient), tree, nil)
if err != nil {
return nil, err
Expand Down
108 changes: 68 additions & 40 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
Expand Down Expand Up @@ -76,33 +77,29 @@ type Tree struct {
lookup PathLookup
blobstore Blobstore

root string
treeSizeAccounting bool
treeTimeAccounting bool
options *options.Options
}

// PermissionCheckFunc defined a function used to check resource permissions
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool

// New returns a new instance of Tree
func New(root string, tta bool, tsa bool, lu PathLookup, bs Blobstore) *Tree {
func New(lu PathLookup, bs Blobstore, o *options.Options) *Tree {
return &Tree{
lookup: lu,
blobstore: bs,
root: root,
treeTimeAccounting: tta,
treeSizeAccounting: tsa,
lookup: lu,
blobstore: bs,
options: o,
}
}

// Setup prepares the tree structure
func (t *Tree) Setup() error {
// create data paths for internal layout
dataPaths := []string{
filepath.Join(t.root, "spaces"),
filepath.Join(t.options.Root, "spaces"),
// notes contain symlinks from nodes/<u-u-i-d>/uploads/<uploadid> to ../../uploads/<uploadid>
// better to keep uploads on a fast / volatile storage before a workflow finally moves them to the nodes dir
filepath.Join(t.root, "uploads"),
filepath.Join(t.options.Root, "uploads"),
}
for _, v := range dataPaths {
err := os.MkdirAll(v, 0700)
Expand Down Expand Up @@ -328,42 +325,73 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
if err != nil {
return nil, err
}
nodes := make([]*node.Node, len(names))

eg, ctx := errgroup.WithContext(ctx)
for i := range names {
pos := i
eg.Go(func() error {
nodeID, err := readChildNodeFromLink(filepath.Join(dir, names[pos]))
if err != nil {
return err
}
numWorkers := t.options.MaxConcurrency
if len(names) < numWorkers {
numWorkers = len(names)
}
work := make(chan string)
results := make(chan *node.Node)

child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n, true)
if err != nil {
return err
g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, name := range names {
select {
case work <- name:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for name := range work {
nodeID, err := readChildNodeFromLink(filepath.Join(dir, name))
if err != nil {
return err
}

// prevent listing denied resources
if !child.IsDenied(ctx) {
if child.SpaceRoot == nil {
child.SpaceRoot = n.SpaceRoot
child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n, true)
if err != nil {
return err
}

// prevent listing denied resources
if !child.IsDenied(ctx) {
if child.SpaceRoot == nil {
child.SpaceRoot = n.SpaceRoot
}
select {
case results <- child:
case <-ctx.Done():
return ctx.Err()
}
}
nodes[pos] = child
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

retNodes := []*node.Node{}
for _, n := range nodes {
if n != nil {
retNodes = append(retNodes, n)
}
for n := range results {
retNodes = append(retNodes, n)
}

if err := g.Wait(); err != nil {
return nil, err
}

return retNodes, nil
}

Expand Down Expand Up @@ -402,7 +430,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
deletionTime := time.Now().UTC().Format(time.RFC3339Nano)

// Prepare the trash
trashLink := filepath.Join(t.root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
trashLink := filepath.Join(t.options.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil {
// Roll back changes
_ = n.RemoveXattr(prefixes.TrashOriginAttr)
Expand Down Expand Up @@ -642,7 +670,7 @@ func (t *Tree) removeNode(path string, n *node.Node) error {
// Propagate propagates changes to the root of the tree
func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) {
sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if !t.treeTimeAccounting && !t.treeSizeAccounting {
if !t.options.TreeTimeAccounting && !t.options.TreeSizeAccounting {
// no propagation enabled
sublog.Debug().Msg("propagation disabled")
return
Expand Down Expand Up @@ -671,7 +699,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
return nil
}

if t.treeTimeAccounting || (t.treeSizeAccounting && sizeDiff != 0) {
if t.options.TreeTimeAccounting || (t.options.TreeSizeAccounting && sizeDiff != 0) {
attrs := node.Attributes{}

var f *lockedfile.File
Expand All @@ -696,7 +724,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
}
}()

if t.treeTimeAccounting {
if t.options.TreeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false

Expand Down Expand Up @@ -731,7 +759,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
}

// size accounting
if t.treeSizeAccounting && sizeDiff != 0 {
if t.options.TreeSizeAccounting && sizeDiff != 0 {
var newSize uint64

// read treesize
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var _ = Describe("Async file uploads", Ordered, func() {

// setup fs
pub, con = make(chan interface{}), make(chan interface{})
tree := tree.New(o.Root, true, true, lu, bs)
tree := tree.New(lu, bs, o)
fs, err = New(o, lu, NewPermissions(permissions, cs3permissionsclient), tree, stream.Chan{pub, con})
Expect(err).ToNot(HaveOccurred())

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var _ = Describe("File uploads", func() {
AddGrant: true,
}, nil).Times(1)
var err error
tree := tree.New(o.Root, true, true, lu, bs)
tree := tree.New(lu, bs, o)
fs, err = decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, cs3permissionsclient), tree, nil)
Expect(err).ToNot(HaveOccurred())

Expand Down

0 comments on commit 8c07e9c

Please sign in to comment.