Skip to content

Commit

Permalink
Merge pull request cs3org#3933 from butonic/jsoncs3-concurrency
Browse files Browse the repository at this point in the history
concurrently invalidate mtime cache in jsoncs3 share manager
  • Loading branch information
kobergj authored Jun 6, 2023
2 parents fe2cb3a + aa7c938 commit 0c2e409
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 47 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/jsoncs3-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: concurrently invalidate mtime cache in jsoncs3 share manager

https://github.com/cs3org/reva/pull/3933
130 changes: 93 additions & 37 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
Expand Down Expand Up @@ -115,6 +116,7 @@ func init() {

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
Expand Down Expand Up @@ -145,6 +147,8 @@ type Manager struct {

initialized bool

MaxConcurrency int

gateway gatewayv1beta1.GatewayAPIClient
eventStream events.Stream
}
Expand Down Expand Up @@ -205,11 +209,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
}
}

return New(s, gc, c.CacheTTL, es)
return New(s, gc, c.CacheTTL, es, c.MaxConcurrency)
}

// New returns a new manager instance.
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream) (*Manager, error) {
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{
Cache: providercache.New(s, ttl),
Expand All @@ -219,6 +223,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int,
storage: s,
gateway: gc,
eventStream: es,
MaxConcurrency: maxconcurrency,
}, nil
}

Expand Down Expand Up @@ -703,7 +708,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
m.Lock()
defer m.Unlock()

var rss []*collaboration.ReceivedShare
user := ctxpkg.ContextMustGetUser(ctx)

ssids := map[string]*receivedsharecache.Space{}
Expand Down Expand Up @@ -750,46 +754,98 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
}

for ssid, rspace := range ssids {
storageID, spaceID, _ := shareid.Decode(ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
for shareID, state := range rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
continue
numWorkers := m.MaxConcurrency
if numWorkers == 0 || len(ssids) < numWorkers {
numWorkers = len(ssids)
}

type w struct {
ssid string
rspace *receivedsharecache.Space
}
work := make(chan w)
results := make(chan *collaboration.ReceivedShare)

g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for ssid, rspace := range ssids {
select {
case work <- w{ssid, rspace}:
case <-ctx.Done():
return ctx.Err()
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
continue
}
for shareID, state := range w.rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}

if share.IsGrantedToUser(s, user) {
if share.MatchesFiltersWithState(s, state.State, filters) {
rs := &collaboration.ReceivedShare{
Share: s,
State: state.State,
MountPoint: state.MountPoint,
if share.IsGrantedToUser(s, user) {
if share.MatchesFiltersWithState(s, state.State, filters) {
rs := &collaboration.ReceivedShare{
Share: s,
State: state.State,
MountPoint: state.MountPoint,
}
select {
case results <- rs:
case <-ctx.Done():
return ctx.Err()
}
}
}
rss = append(rss, rs)
}
}
}
return nil
})
}

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

rss := []*collaboration.ReceivedShare{}
for n := range results {
rss = append(rss, n)
}

if err := g.Wait(); err != nil {
return nil, err
}

return rss, nil
Expand Down
20 changes: 10 additions & 10 deletions pkg/share/manager/jsoncs3/jsoncs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(err).ToNot(HaveOccurred())

client = &mocks.GatewayAPIClient{}
m, err = jsoncs3.New(storage, client, 0, nil)
m, err = jsoncs3.New(storage, client, 0, nil, 0)
Expect(err).ToNot(HaveOccurred())
})

Expand Down Expand Up @@ -250,7 +250,7 @@ var _ = Describe("Jsoncs3", func() {
})
Expect(s).ToNot(BeNil())

m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s = shareBykey(&collaboration.ShareKey{
Expand Down Expand Up @@ -444,7 +444,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("loads the cache when it doesn't have an entry", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s, err := m.GetShare(ctx, shareRef)
Expand Down Expand Up @@ -504,7 +504,7 @@ var _ = Describe("Jsoncs3", func() {
})
Expect(err).ToNot(HaveOccurred())

m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s, err := m.GetShare(ctx, &collaboration.ShareReference{
Expand Down Expand Up @@ -617,7 +617,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(us).ToNot(BeNil())
Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue())

m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s = shareBykey(&collaboration.ShareKey{
Expand Down Expand Up @@ -748,7 +748,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncronizes the user received cache before listing", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{})
Expand Down Expand Up @@ -816,7 +816,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncronizes the group received cache before listing", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{})
Expand Down Expand Up @@ -860,7 +860,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncs the cache", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down Expand Up @@ -894,7 +894,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncs the cache", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down Expand Up @@ -1017,7 +1017,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(err).ToNot(HaveOccurred())
Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED))

m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down

0 comments on commit 0c2e409

Please sign in to comment.