Skip to content

Commit

Permalink
Merge pull request cs3org#4983 from aduffeck/cleanup-stale-shares
Browse files Browse the repository at this point in the history
Cleanup stale shares
  • Loading branch information
butonic authored Dec 3, 2024
2 parents d9b2ffa + 7f574e4 commit 17f3395
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 3 deletions.
1 change: 1 addition & 0 deletions changelog/unreleased/delete-stale-shares.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ Bugfix: Delete stale shares in the jsoncs3 share manager

The jsoncs3 share manager now properly deletes all references to removed shares and shares that belong to a space that was deleted

https://github.com/cs3org/reva/pull/4983
https://github.com/cs3org/reva/pull/4975
56 changes: 55 additions & 1 deletion pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,13 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
for shareID, state := range w.rspace.States {
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true)
if err != nil || s == nil {
if err != nil {
sublogr.Error().Err(err).Msg("could not retrieve share")
continue
}
if s == nil {
sublogr.Warn().Str("shareid", shareID).Msg("share not found. cleaning up")
_ = m.UserReceivedStates.Remove(ctx, user.Id.OpaqueId, w.ssid, shareID)
continue
}
sublogr = sublogr.With().Str("shareid", shareID).Logger()
Expand Down Expand Up @@ -1227,3 +1233,51 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipS

return eg.Wait()
}

func (m *Manager) CleanupStaleShares(ctx context.Context) {
log := appctx.GetLogger(ctx)

if err := m.initialize(ctx); err != nil {
return
}

// list all shares
providers, err := m.Cache.All(ctx)
if err != nil {
log.Error().Err(err).Msg("error listing all shares")
return
}

client, err := m.gatewaySelector.Next()
if err != nil {
log.Error().Err(err).Msg("could not get gateway client")
}

providers.Range(func(storage string, spaces *providercache.Spaces) bool {
log.Info().Str("storage", storage).Interface("spaceCount", spaces.Spaces.Count()).Msg("checking storage")

spaces.Spaces.Range(func(space string, shares *providercache.Shares) bool {
log.Info().Str("storage", storage).Str("space", space).Interface("shareCount", len(shares.Shares)).Msg("checking space")

for _, s := range shares.Shares {
req := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: s.ResourceId, Path: "."},
}
res, err := client.Stat(ctx, req)
if err != nil {
log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not stat shared resource")
}
if res.Status.Code == rpcv1beta1.Code_CODE_NOT_FOUND {
log.Info().Str("storage", storage).Str("space", space).Msg("shared resource does not exist anymore. cleaning up shares")
if err := m.removeShare(ctx, s, false); err != nil {
log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not remove share")
}
}
}

return true
})

return true
})
}
37 changes: 36 additions & 1 deletion pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -319,6 +320,36 @@ func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string, ski
return space.Shares[shareID], nil
}

// All returns all entries in the storage
func (c *Cache) All(ctx context.Context) (*mtimesyncedcache.Map[string, *Spaces], error) {
ctx, span := tracer.Start(ctx, "All")
defer span.End()

providers, err := c.storage.ListDir(ctx, "/storages")
if err != nil {
return nil, err
}
for _, provider := range providers {
storageID := provider.Name
spaces, err := c.storage.ListDir(ctx, path.Join("/storages", storageID))
if err != nil {
return nil, err
}
for _, space := range spaces {
spaceID := strings.TrimSuffix(space.Name, ".json")

unlock := c.LockSpace(spaceID)
span.AddEvent("got lock for space " + spaceID)
if err := c.syncWithLock(ctx, storageID, spaceID); err != nil {
return nil, err
}
unlock()
}
}

return &c.Providers, nil
}

// ListSpace returns the list of shares in a given space
func (c *Cache) ListSpace(ctx context.Context, storageID, spaceID string) (*Shares, error) {
ctx, span := tracer.Start(ctx, "ListSpace")
Expand Down Expand Up @@ -438,7 +469,11 @@ func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error
if !ok {
return nil
}
spaces.Spaces.Store(spaceID, &Shares{})
newShares := &Shares{}
if space, ok := spaces.Spaces.Load(spaceID); ok {
newShares.Etag = space.Etag // keep the etag to allow overwriting the state on the server
}
spaces.Spaces.Store(spaceID, newShares)

return c.Persist(ctx, storageID, spaceID)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,13 @@ var _ = Describe("Cache", func() {
Expect(s).To(BeNil())
})
})

Describe("All", func() {
It("returns all entries", func() {
entries, err := c.All(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(entries.Count()).To(Equal(1))
})
})
})
})
9 changes: 9 additions & 0 deletions pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
}

func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) }

func (m *Map[K, V]) Count() int {
l := 0
m.Range(func(_ K, _ V) bool {
l++
return true
})
return l
}
6 changes: 5 additions & 1 deletion pkg/storage/utils/metadata/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) {
authCtx, span := tracer.Start(authCtx, "getAuthContext", trace.WithLinks(trace.LinkFromContext(ctx)))
defer span.End()

client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr)
selector, err := pool.GatewaySelector(cs3.gatewayAddr)
if err != nil {
return nil, err
}
client, err := selector.Next()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 17f3395

Please sign in to comment.