From 660e79ece62ffc302c5e7d30821410cfcd293b0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 23 Aug 2022 15:15:21 +0000 Subject: [PATCH 01/16] load, dump and storageprovider fixes for migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../storageprovider/storageprovider.go | 2 +- pkg/share/manager/cs3/cs3.go | 88 +++++++++++++++++++ pkg/share/manager/jsoncs3/jsoncs3.go | 51 +++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 6f4d1dc7f2..b48c5a8391 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -323,7 +323,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate } ifUnmodifiedSince := req.GetIfUnmodifiedSince() if ifUnmodifiedSince != nil { - if utils.LaterTS(sRes.Info.Mtime, ifUnmodifiedSince) == sRes.Info.Mtime { + if sRes.Info != nil && sRes.Info.Mtime != nil && utils.LaterTS(sRes.Info.Mtime, ifUnmodifiedSince) == sRes.Info.Mtime { return &provider.InitiateFileUploadResponse{ Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"), }, nil diff --git a/pkg/share/manager/cs3/cs3.go b/pkg/share/manager/cs3/cs3.go index 04635e8d42..8fb8671572 100644 --- a/pkg/share/manager/cs3/cs3.go +++ b/pkg/share/manager/cs3/cs3.go @@ -206,6 +206,57 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar return nil } +// Dump exports shares and received shares to channels (e.g. during migration) +func (m *Manager) Dump(ctx context.Context, shareChan chan<- *collaboration.Share, receivedShareChan chan<- share.ReceivedShareWithUser) error { + log := appctx.GetLogger(ctx) + if err := m.initialize(); err != nil { + return err + } + + shareids, err := m.storage.ReadDir(ctx, "shares") + if err != nil { + log.Error().Err(err).Msg("error fetching shares") + } + for _, shareid := range shareids { + if s, err := m.getShareByID(ctx, shareid); err == nil { + // dump share data + shareChan <- s + // dump grantee metadata that includes share state and mount path + grantees, err := m.storage.ReadDir(ctx, path.Join("metadata", s.Id.OpaqueId)) + if err != nil { + continue + } + for _, grantee := range grantees { // use default values if the grantee didn't configure anything yet + metadata := ReceivedShareMetadata{ + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + data, err := m.storage.SimpleDownload(ctx, path.Join("metadata", s.Id.OpaqueId, grantee)) + if err == nil { + err = json.Unmarshal(data, &metadata) + if err != nil { + continue + } + } + g, err := indexToGrantee(grantee) + if err != nil || g.Type != provider.GranteeType_GRANTEE_TYPE_USER { + continue + } + receivedShareChan <- share.ReceivedShareWithUser{ + UserID: g.GetUserId(), + ReceivedShare: &collaboration.ReceivedShare{ + Share: s, + State: metadata.State, + MountPoint: metadata.MountPoint, + }, + } + } + } else { + log.Error().Err(err).Str("shareid", shareid).Msg("error fetching share") + } + } + return nil +} + // Share creates a new share func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { if err := m.initialize(); err != nil { @@ -756,6 +807,43 @@ func granteeToIndex(grantee *provider.Grantee) (string, error) { } } +// indexToGrantee trues to unparse a grantee in a metadata dir +// unfortunately, it is just concatenated by :, causing nasty corner cases +func indexToGrantee(name string) (*provider.Grantee, error) { + unescaped, err := url.QueryUnescape(name) + if err != nil { + return nil, err + } + parts := strings.SplitN(unescaped, ":", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid grantee %s", unescaped) + } + switch parts[0] { + case "user": + lastInd := strings.LastIndex(parts[1], ":") + return &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{ + UserId: &userpb.UserId{ + Idp: parts[1][:lastInd], + OpaqueId: parts[1][lastInd+1:], + }, + }, + }, nil + case "group": + return &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_GROUP, + Id: &provider.Grantee_GroupId{ + GroupId: &groupv1beta1.GroupId{ + OpaqueId: parts[1], + }, + }, + }, nil + default: + return nil, fmt.Errorf("invalid grantee %s", unescaped) + } +} + func intersectSlices(a, b []string) []string { aMap := map[string]bool{} for _, s := range a { diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 0d93272f3a..5c183358d4 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -30,6 +30,7 @@ import ( userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/share" @@ -697,3 +698,53 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab return rs, nil } + +// Load imports shares and received shares from channels (e.g. during migration) +func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error { + log := appctx.GetLogger(ctx) + if err := m.initialize(); err != nil { + return err + } + + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(2) + go func() { + for s := range shareChan { + if s == nil { + continue + } + mu.Lock() + if err := m.Cache.Add(context.Background(), s.ResourceId.StorageId, s.ResourceId.SpaceId, s.Id.OpaqueId, s); err != nil { + log.Error().Err(err).Interface("share", s).Msg("error persisting share") + } + if err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId); err != nil { + log.Error().Err(err).Interface("share", s).Msg("error persisting created cache") + } + mu.Unlock() + } + wg.Done() + }() + go func() { + for s := range receivedShareChan { + if s.ReceivedShare != nil { + mu.Lock() + switch s.ReceivedShare.Share.Grantee.Type { + case provider.GranteeType_GRANTEE_TYPE_USER: + if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil { + log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") + } + case provider.GranteeType_GRANTEE_TYPE_GROUP: + if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil { + log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") + } + } + mu.Unlock() + } + } + wg.Done() + }() + wg.Wait() + + return nil +} From e4962dc269090a03d7fec75356bc8d02a0f52893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 24 Aug 2022 14:58:57 +0000 Subject: [PATCH 02/16] jsoncs3 load testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/jsoncs3.go | 18 ++++-- pkg/share/manager/jsoncs3/jsoncs3_test.go | 63 +++++++++++++++++-- .../jsoncs3/providercache/providercache.go | 13 ++++ .../manager/jsoncs3/sharecache/sharecache.go | 4 +- pkg/share/manager/jsoncs3/shareid/shareid.go | 11 +++- 5 files changed, 95 insertions(+), 14 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 5c183358d4..16045c5f56 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -252,7 +252,7 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla return nil, err } - spaceID := md.Id.StorageId + "^" + md.Id.SpaceId + spaceID := md.Id.StorageId + shareid.ProviderDelimiter + md.Id.SpaceId // set flag for grantee to have access to share switch g.Grantee.Type { case provider.GranteeType_GRANTEE_TYPE_USER: @@ -623,7 +623,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) _ = m.UserReceivedStates.Sync(ctx, userID) // ignore error, cache will be updated on next read - state := m.UserReceivedStates.Get(userID, storageID+"^"+spaceID, s.Id.GetOpaqueId()) + state := m.UserReceivedStates.Get(userID, storageID+shareid.ProviderDelimiter+spaceID, s.Id.GetOpaqueId()) if state != nil { rs.State = state.State rs.MountPoint = state.MountPoint @@ -683,13 +683,13 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab userID := ctxpkg.ContextMustGetUser(ctx) - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+"^"+rs.Share.ResourceId.SpaceId, rs) + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.ProviderDelimiter+rs.Share.ResourceId.SpaceId, rs) if _, ok := err.(errtypes.IsPreconditionFailed); ok { // when persisting fails, download, readd and persist again if err := m.UserReceivedStates.Sync(ctx, userID.GetId().GetOpaqueId()); err != nil { return nil, err } - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+"^"+rs.Share.ResourceId.SpaceId, rs) + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.ProviderDelimiter+rs.Share.ResourceId.SpaceId, rs) // TODO try more often? } if err != nil { @@ -715,11 +715,15 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar continue } mu.Lock() - if err := m.Cache.Add(context.Background(), s.ResourceId.StorageId, s.ResourceId.SpaceId, s.Id.OpaqueId, s); err != nil { + if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil { log.Error().Err(err).Interface("share", s).Msg("error persisting share") + } else { + log.Info().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") } if err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId); err != nil { log.Error().Err(err).Interface("share", s).Msg("error persisting created cache") + } else { + log.Info().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") } mu.Unlock() } @@ -733,10 +737,14 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar case provider.GranteeType_GRANTEE_TYPE_USER: if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil { log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") + } else { + log.Info().Str("userid", s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId()).Str("spaceid", s.ReceivedShare.GetShare().GetResourceId().GetSpaceId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") } case provider.GranteeType_GRANTEE_TYPE_GROUP: if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil { log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") + } else { + log.Info().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") } } mu.Unlock() diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index ba65b96037..ca4be1309a 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "time" groupv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" @@ -33,8 +34,10 @@ import ( providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/conversions" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + sharespkg "github.com/cs3org/reva/v2/pkg/share" "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3" "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -147,6 +150,58 @@ var _ = Describe("Jsoncs3", func() { } }) + Describe("Load", func() { + It("loads shares including state and mountpoint information", func() { + + sharesChan := make(chan *collaboration.Share) + receivedChan := make(chan sharespkg.ReceivedShareWithUser) + + share := &collaboration.Share{ + Id: &collaboration.ShareId{OpaqueId: "1iaeiae$vlcvlcvlc!pzbpzbpzb"}, + // FIXME we may have to deal with importing existing share ids ... without a storage or provider prefix + ResourceId: &provider.ResourceId{StorageId: "1iaeiae", SpaceId: "vlcvlcvlc", OpaqueId: "abcd"}, + Creator: user1.GetId(), + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: grantee.GetId()}, + }, + Permissions: &collaboration.SharePermissions{ + Permissions: &provider.ResourcePermissions{ + GetPath: true, + InitiateFileDownload: true, + ListFileVersions: true, + ListContainer: true, + Stat: true, + }, + }, + } + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + err := m.Load(ctx, sharesChan, receivedChan) + Expect(err).ToNot(HaveOccurred()) + wg.Done() + }() + go func() { + sharesChan <- share + close(sharesChan) + close(receivedChan) + wg.Done() + }() + wg.Wait() + Eventually(sharesChan).Should(BeClosed()) + Eventually(receivedChan).Should(BeClosed()) + + s, err := m.GetShare(ctx, &collaboration.ShareReference{Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }}) + + Expect(err).ToNot(HaveOccurred()) + Expect(s.ResourceId.OpaqueId).To(Equal(share.ResourceId.OpaqueId)) + }) + }) + Describe("Share", func() { It("fails if the share already exists", func() { _, err := m.Share(ctx, sharedResource, grant) @@ -580,18 +635,18 @@ var _ = Describe("Jsoncs3", func() { Expect(len(shares)).To(Equal(1)) // Add a second cache to the provider cache so it can be referenced - Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid^spaceid°secondshare", &collaboration.Share{ + Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid"+shareid.ProviderDelimiter+"spaceid"+shareid.SpaceDelimiter+"secondshare", &collaboration.Share{ Creator: user1.Id, })).To(Succeed()) cache := sharecache.UserShareCache{ Mtime: time.Now(), UserShares: map[string]*sharecache.SpaceShareIDs{ - "storageid^spaceid": { + "storageid" + shareid.ProviderDelimiter + "spaceid": { Mtime: time.Now(), IDs: map[string]struct{}{ - shares[0].Id.OpaqueId: {}, - "storageid^spaceid°secondshare": {}, + shares[0].Id.OpaqueId: {}, + "storageid" + shareid.ProviderDelimiter + "spaceid" + shareid.SpaceDelimiter + "secondshare": {}, }, }, }, diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go index 83210d270b..4404cf55ca 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -21,6 +21,8 @@ package providercache import ( "context" "encoding/json" + "fmt" + "os" "path" "path/filepath" "time" @@ -101,6 +103,14 @@ func New(s metadata.Storage) Cache { // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error { + switch { + case storageID == "": + return fmt.Errorf("missing storage id") + case spaceID == "": + return fmt.Errorf("missing space id") + case shareID == "": + return fmt.Errorf("missing share id") + } c.initializeIfNeeded(storageID, spaceID) c.Providers[storageID].Spaces[spaceID].Shares[shareID] = share @@ -191,6 +201,9 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { if _, ok := err.(errtypes.NotFound); ok { return nil // Nothing to sync against } + if _, ok := err.(*os.PathError); ok { + return nil // Nothing to sync against + } log.Error().Err(err).Msg("Failed to stat the provider cache") return err } diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/pkg/share/manager/jsoncs3/sharecache/sharecache.go index 82cb19b27e..6f50feadef 100644 --- a/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -68,7 +68,7 @@ func New(s metadata.Storage, namespace, filename string) Cache { // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) - ssid := storageid + "^" + spaceid + ssid := storageid + shareid.ProviderDelimiter + spaceid now := time.Now() if c.UserShares[userid] == nil { @@ -91,7 +91,7 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error { // Remove removes a share for the given user func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) - ssid := storageid + "^" + spaceid + ssid := storageid + shareid.ProviderDelimiter + spaceid if c.UserShares[userid] != nil { if c.UserShares[userid].UserShares[ssid] != nil { diff --git a/pkg/share/manager/jsoncs3/shareid/shareid.go b/pkg/share/manager/jsoncs3/shareid/shareid.go index 65d774d198..ada1498b46 100644 --- a/pkg/share/manager/jsoncs3/shareid/shareid.go +++ b/pkg/share/manager/jsoncs3/shareid/shareid.go @@ -20,21 +20,26 @@ package shareid import "strings" +const ( + ProviderDelimiter = "$" //"^" + SpaceDelimiter = "!" //"°" +) + // Encode encodes a share id func Encode(providerID, spaceID, shareID string) string { - return providerID + "^" + spaceID + "°" + shareID + return providerID + ProviderDelimiter + spaceID + SpaceDelimiter + shareID } // Decode decodes an encoded shareid // share ids are of the format ^° func Decode(id string) (string, string, string) { - parts := strings.SplitN(id, "^", 2) + parts := strings.SplitN(id, ProviderDelimiter, 2) if len(parts) == 1 { return "", "", parts[0] } storageid := parts[0] - parts = strings.SplitN(parts[1], "°", 2) + parts = strings.SplitN(parts[1], SpaceDelimiter, 2) if len(parts) == 1 { return storageid, parts[0], "" } From eeab7b0f98d307517fc97cdce1f0ecb03d957013 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 25 Aug 2022 12:47:07 +0000 Subject: [PATCH 03/16] update unroutable shareids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/jsoncs3.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 16045c5f56..6401b72a99 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -20,6 +20,7 @@ package jsoncs3 import ( "context" + "strings" "sync" "github.com/google/uuid" @@ -699,6 +700,16 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab return rs, nil } +func shareIsRoutable(share *collaboration.Share) bool { + if strings.Contains(share.Id.OpaqueId, shareid.ProviderDelimiter) && strings.Contains(share.Id.OpaqueId, shareid.SpaceDelimiter) { + return true + } + return false +} +func updateShareID(share *collaboration.Share) { + share.Id.OpaqueId = shareid.Encode(share.ResourceId.StorageId, share.ResourceId.SpaceId, share.Id.OpaqueId) +} + // Load imports shares and received shares from channels (e.g. during migration) func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error { log := appctx.GetLogger(ctx) @@ -714,6 +725,9 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if s == nil { continue } + if !shareIsRoutable(s) { + updateShareID(s) + } mu.Lock() if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil { log.Error().Err(err).Interface("share", s).Msg("error persisting share") @@ -732,6 +746,9 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar go func() { for s := range receivedShareChan { if s.ReceivedShare != nil { + if !shareIsRoutable(s.ReceivedShare.GetShare()) { + updateShareID(s.ReceivedShare.GetShare()) + } mu.Lock() switch s.ReceivedShare.Share.Grantee.Type { case provider.GranteeType_GRANTEE_TYPE_USER: From 141714de2c719c7d100f86aa5da5ed43e991a853 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 25 Aug 2022 12:47:30 +0000 Subject: [PATCH 04/16] add some debug logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/providercache/providercache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go index 4404cf55ca..8aa2ea1042 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -199,9 +199,11 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { info, err := c.storage.Stat(ctx, jsonPath) if err != nil { if _, ok := err.(errtypes.NotFound); ok { + log.Debug().Msg("no json file, nothing to sync") return nil // Nothing to sync against } if _, ok := err.(*os.PathError); ok { + log.Debug().Msg("no storage dir, nothing to sync") return nil // Nothing to sync against } log.Error().Err(err).Msg("Failed to stat the provider cache") From 252743816ad5e6661aabd38da4a2f1c587c379d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 25 Aug 2022 14:44:41 +0000 Subject: [PATCH 05/16] add changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/jsoncs3-load.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/jsoncs3-load.md diff --git a/changelog/unreleased/jsoncs3-load.md b/changelog/unreleased/jsoncs3-load.md new file mode 100644 index 0000000000..a804928ffc --- /dev/null +++ b/changelog/unreleased/jsoncs3-load.md @@ -0,0 +1,5 @@ +Enhancement: cs3 to jsoncs3 share manager migration + +We added a Load() to the jsoncs3 and Dump() to the sc3 share manager. The shareid might need to be prefixed with a storageid and space id. + +https://github.com/cs3org/reva/pull/3171 \ No newline at end of file From 5088044612529544ecfb7cd5d346e14cf44da983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 26 Aug 2022 09:06:28 +0000 Subject: [PATCH 06/16] use : as id delimiter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/jsoncs3.go | 10 ++++---- pkg/share/manager/jsoncs3/jsoncs3_test.go | 6 ++--- .../manager/jsoncs3/sharecache/sharecache.go | 4 +-- pkg/share/manager/jsoncs3/shareid/shareid.go | 25 ++++++++----------- 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 6401b72a99..56ad95e15b 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -253,7 +253,7 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla return nil, err } - spaceID := md.Id.StorageId + shareid.ProviderDelimiter + md.Id.SpaceId + spaceID := md.Id.StorageId + shareid.IdDelimiter + md.Id.SpaceId // set flag for grantee to have access to share switch g.Grantee.Type { case provider.GranteeType_GRANTEE_TYPE_USER: @@ -624,7 +624,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) _ = m.UserReceivedStates.Sync(ctx, userID) // ignore error, cache will be updated on next read - state := m.UserReceivedStates.Get(userID, storageID+shareid.ProviderDelimiter+spaceID, s.Id.GetOpaqueId()) + state := m.UserReceivedStates.Get(userID, storageID+shareid.IdDelimiter+spaceID, s.Id.GetOpaqueId()) if state != nil { rs.State = state.State rs.MountPoint = state.MountPoint @@ -684,13 +684,13 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab userID := ctxpkg.ContextMustGetUser(ctx) - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.ProviderDelimiter+rs.Share.ResourceId.SpaceId, rs) + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IdDelimiter+rs.Share.ResourceId.SpaceId, rs) if _, ok := err.(errtypes.IsPreconditionFailed); ok { // when persisting fails, download, readd and persist again if err := m.UserReceivedStates.Sync(ctx, userID.GetId().GetOpaqueId()); err != nil { return nil, err } - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.ProviderDelimiter+rs.Share.ResourceId.SpaceId, rs) + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IdDelimiter+rs.Share.ResourceId.SpaceId, rs) // TODO try more often? } if err != nil { @@ -701,7 +701,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab } func shareIsRoutable(share *collaboration.Share) bool { - if strings.Contains(share.Id.OpaqueId, shareid.ProviderDelimiter) && strings.Contains(share.Id.OpaqueId, shareid.SpaceDelimiter) { + if strings.Contains(share.Id.OpaqueId, shareid.IdDelimiter) { return true } return false diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index ca4be1309a..e6e7d4fae9 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -635,18 +635,18 @@ var _ = Describe("Jsoncs3", func() { Expect(len(shares)).To(Equal(1)) // Add a second cache to the provider cache so it can be referenced - Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid"+shareid.ProviderDelimiter+"spaceid"+shareid.SpaceDelimiter+"secondshare", &collaboration.Share{ + Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid"+shareid.IdDelimiter+"spaceid"+shareid.IdDelimiter+"secondshare", &collaboration.Share{ Creator: user1.Id, })).To(Succeed()) cache := sharecache.UserShareCache{ Mtime: time.Now(), UserShares: map[string]*sharecache.SpaceShareIDs{ - "storageid" + shareid.ProviderDelimiter + "spaceid": { + "storageid" + shareid.IdDelimiter + "spaceid": { Mtime: time.Now(), IDs: map[string]struct{}{ shares[0].Id.OpaqueId: {}, - "storageid" + shareid.ProviderDelimiter + "spaceid" + shareid.SpaceDelimiter + "secondshare": {}, + "storageid" + shareid.IdDelimiter + "spaceid" + shareid.IdDelimiter + "secondshare": {}, }, }, }, diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/pkg/share/manager/jsoncs3/sharecache/sharecache.go index 6f50feadef..f4c8a7c772 100644 --- a/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -68,7 +68,7 @@ func New(s metadata.Storage, namespace, filename string) Cache { // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) - ssid := storageid + shareid.ProviderDelimiter + spaceid + ssid := storageid + shareid.IdDelimiter + spaceid now := time.Now() if c.UserShares[userid] == nil { @@ -91,7 +91,7 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error { // Remove removes a share for the given user func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) - ssid := storageid + shareid.ProviderDelimiter + spaceid + ssid := storageid + shareid.IdDelimiter + spaceid if c.UserShares[userid] != nil { if c.UserShares[userid].UserShares[ssid] != nil { diff --git a/pkg/share/manager/jsoncs3/shareid/shareid.go b/pkg/share/manager/jsoncs3/shareid/shareid.go index ada1498b46..5f0e26c52b 100644 --- a/pkg/share/manager/jsoncs3/shareid/shareid.go +++ b/pkg/share/manager/jsoncs3/shareid/shareid.go @@ -21,28 +21,25 @@ package shareid import "strings" const ( - ProviderDelimiter = "$" //"^" - SpaceDelimiter = "!" //"°" + // IdDelimiter is used to separate the providerid, spaceid and shareid + IdDelimiter = ":" ) // Encode encodes a share id func Encode(providerID, spaceID, shareID string) string { - return providerID + ProviderDelimiter + spaceID + SpaceDelimiter + shareID + return providerID + IdDelimiter + spaceID + IdDelimiter + shareID } // Decode decodes an encoded shareid -// share ids are of the format ^° +// share ids are of the format :: func Decode(id string) (string, string, string) { - parts := strings.SplitN(id, ProviderDelimiter, 2) - if len(parts) == 1 { + parts := strings.SplitN(id, IdDelimiter, 3) + switch len(parts) { + case 1: return "", "", parts[0] + case 2: + return parts[0], parts[1], "" + default: + return parts[0], parts[1], parts[2] } - - storageid := parts[0] - parts = strings.SplitN(parts[1], SpaceDelimiter, 2) - if len(parts) == 1 { - return storageid, parts[0], "" - } - - return storageid, parts[0], parts[1] } From bb1fd387367c916b72f1a57b0c2e116d2a1ec1d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 26 Aug 2022 11:57:11 +0000 Subject: [PATCH 07/16] lint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/jsoncs3.go | 13 +++++-------- pkg/share/manager/jsoncs3/jsoncs3_test.go | 6 +++--- pkg/share/manager/jsoncs3/sharecache/sharecache.go | 4 ++-- pkg/share/manager/jsoncs3/shareid/shareid.go | 8 ++++---- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 56ad95e15b..693ff8f9c3 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -253,7 +253,7 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla return nil, err } - spaceID := md.Id.StorageId + shareid.IdDelimiter + md.Id.SpaceId + spaceID := md.Id.StorageId + shareid.IDDelimiter + md.Id.SpaceId // set flag for grantee to have access to share switch g.Grantee.Type { case provider.GranteeType_GRANTEE_TYPE_USER: @@ -624,7 +624,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) _ = m.UserReceivedStates.Sync(ctx, userID) // ignore error, cache will be updated on next read - state := m.UserReceivedStates.Get(userID, storageID+shareid.IdDelimiter+spaceID, s.Id.GetOpaqueId()) + state := m.UserReceivedStates.Get(userID, storageID+shareid.IDDelimiter+spaceID, s.Id.GetOpaqueId()) if state != nil { rs.State = state.State rs.MountPoint = state.MountPoint @@ -684,13 +684,13 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab userID := ctxpkg.ContextMustGetUser(ctx) - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IdDelimiter+rs.Share.ResourceId.SpaceId, rs) + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs) if _, ok := err.(errtypes.IsPreconditionFailed); ok { // when persisting fails, download, readd and persist again if err := m.UserReceivedStates.Sync(ctx, userID.GetId().GetOpaqueId()); err != nil { return nil, err } - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IdDelimiter+rs.Share.ResourceId.SpaceId, rs) + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs) // TODO try more often? } if err != nil { @@ -701,10 +701,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab } func shareIsRoutable(share *collaboration.Share) bool { - if strings.Contains(share.Id.OpaqueId, shareid.IdDelimiter) { - return true - } - return false + return strings.Contains(share.Id.OpaqueId, shareid.IDDelimiter) } func updateShareID(share *collaboration.Share) { share.Id.OpaqueId = shareid.Encode(share.ResourceId.StorageId, share.ResourceId.SpaceId, share.Id.OpaqueId) diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index e6e7d4fae9..ec4898ecfa 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -635,18 +635,18 @@ var _ = Describe("Jsoncs3", func() { Expect(len(shares)).To(Equal(1)) // Add a second cache to the provider cache so it can be referenced - Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid"+shareid.IdDelimiter+"spaceid"+shareid.IdDelimiter+"secondshare", &collaboration.Share{ + Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid"+shareid.IDDelimiter+"spaceid"+shareid.IDDelimiter+"secondshare", &collaboration.Share{ Creator: user1.Id, })).To(Succeed()) cache := sharecache.UserShareCache{ Mtime: time.Now(), UserShares: map[string]*sharecache.SpaceShareIDs{ - "storageid" + shareid.IdDelimiter + "spaceid": { + "storageid" + shareid.IDDelimiter + "spaceid": { Mtime: time.Now(), IDs: map[string]struct{}{ shares[0].Id.OpaqueId: {}, - "storageid" + shareid.IdDelimiter + "spaceid" + shareid.IdDelimiter + "secondshare": {}, + "storageid" + shareid.IDDelimiter + "spaceid" + shareid.IDDelimiter + "secondshare": {}, }, }, }, diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/pkg/share/manager/jsoncs3/sharecache/sharecache.go index f4c8a7c772..b7699ab8f4 100644 --- a/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -68,7 +68,7 @@ func New(s metadata.Storage, namespace, filename string) Cache { // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) - ssid := storageid + shareid.IdDelimiter + spaceid + ssid := storageid + shareid.IDDelimiter + spaceid now := time.Now() if c.UserShares[userid] == nil { @@ -91,7 +91,7 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error { // Remove removes a share for the given user func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) - ssid := storageid + shareid.IdDelimiter + spaceid + ssid := storageid + shareid.IDDelimiter + spaceid if c.UserShares[userid] != nil { if c.UserShares[userid].UserShares[ssid] != nil { diff --git a/pkg/share/manager/jsoncs3/shareid/shareid.go b/pkg/share/manager/jsoncs3/shareid/shareid.go index 5f0e26c52b..3340ac36dd 100644 --- a/pkg/share/manager/jsoncs3/shareid/shareid.go +++ b/pkg/share/manager/jsoncs3/shareid/shareid.go @@ -21,19 +21,19 @@ package shareid import "strings" const ( - // IdDelimiter is used to separate the providerid, spaceid and shareid - IdDelimiter = ":" + // IDDelimiter is used to separate the providerid, spaceid and shareid + IDDelimiter = ":" ) // Encode encodes a share id func Encode(providerID, spaceID, shareID string) string { - return providerID + IdDelimiter + spaceID + IdDelimiter + shareID + return providerID + IDDelimiter + spaceID + IDDelimiter + shareID } // Decode decodes an encoded shareid // share ids are of the format :: func Decode(id string) (string, string, string) { - parts := strings.SplitN(id, IdDelimiter, 3) + parts := strings.SplitN(id, IDDelimiter, 3) switch len(parts) { case 1: return "", "", parts[0] From 449144dfb3a7a0bb83af6cddd8908823faa58db1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 26 Aug 2022 14:00:44 +0000 Subject: [PATCH 08/16] prevent panic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/providercache/providercache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go index 8aa2ea1042..5fc975b9ae 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -139,7 +139,7 @@ func (c *Cache) Get(storageID, spaceID, shareID string) *collaboration.Share { // ListSpace returns the list of shares in a given space func (c *Cache) ListSpace(storageID, spaceID string) *Shares { - if c.Providers[storageID] == nil { + if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil { return &Shares{} } return c.Providers[storageID].Spaces[spaceID] From cf914f4e8a20ee7ed4d058b5b2e3789c8b061a14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 13:04:30 +0200 Subject: [PATCH 09/16] Apply some suggestions from code review --- pkg/share/manager/cs3/cs3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/share/manager/cs3/cs3.go b/pkg/share/manager/cs3/cs3.go index 8fb8671572..7fc1e2b9b0 100644 --- a/pkg/share/manager/cs3/cs3.go +++ b/pkg/share/manager/cs3/cs3.go @@ -215,7 +215,7 @@ func (m *Manager) Dump(ctx context.Context, shareChan chan<- *collaboration.Shar shareids, err := m.storage.ReadDir(ctx, "shares") if err != nil { - log.Error().Err(err).Msg("error fetching shares") + return err } for _, shareid := range shareids { if s, err := m.getShareByID(ctx, shareid); err == nil { @@ -807,7 +807,7 @@ func granteeToIndex(grantee *provider.Grantee) (string, error) { } } -// indexToGrantee trues to unparse a grantee in a metadata dir +// indexToGrantee tries to unparse a grantee in a metadata dir // unfortunately, it is just concatenated by :, causing nasty corner cases func indexToGrantee(name string) (*provider.Grantee, error) { unescaped, err := url.QueryUnescape(name) From 4d6fcf18cb0593b7d208e67b78d49139e252b849 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 13:04:53 +0200 Subject: [PATCH 10/16] Update internal/grpc/services/storageprovider/storageprovider.go Co-authored-by: kobergj --- internal/grpc/services/storageprovider/storageprovider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index b48c5a8391..148bb2c923 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -323,7 +323,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate } ifUnmodifiedSince := req.GetIfUnmodifiedSince() if ifUnmodifiedSince != nil { - if sRes.Info != nil && sRes.Info.Mtime != nil && utils.LaterTS(sRes.Info.Mtime, ifUnmodifiedSince) == sRes.Info.Mtime { + if utils.LaterTS(sRes.GetInfo().GetMtime(), ifUnmodifiedSince) == sRes.Info.Mtime { return &provider.InitiateFileUploadResponse{ Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"), }, nil From 747c36132ec26c130f7d73b9f37bdb1f2fb179f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 11:25:56 +0000 Subject: [PATCH 11/16] extract method, drop implemented FIXME MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/cs3/cs3.go | 30 ++++++++++++++--------- pkg/share/manager/jsoncs3/jsoncs3_test.go | 3 +-- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/share/manager/cs3/cs3.go b/pkg/share/manager/cs3/cs3.go index 7fc1e2b9b0..ac0d6dd12d 100644 --- a/pkg/share/manager/cs3/cs3.go +++ b/pkg/share/manager/cs3/cs3.go @@ -206,6 +206,22 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar return nil } +func (m *Manager) getMetadata(ctx context.Context, shareid, grantee string) ReceivedShareMetadata { + // use default values if the grantee didn't configure anything yet + metadata := ReceivedShareMetadata{ + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + data, err := m.storage.SimpleDownload(ctx, path.Join("metadata", shareid, grantee)) + if err != nil { + return metadata + } + err = json.Unmarshal(data, &metadata) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("shareid", shareid).Msg("error fetching share") + } + return metadata +} + // Dump exports shares and received shares to channels (e.g. during migration) func (m *Manager) Dump(ctx context.Context, shareChan chan<- *collaboration.Share, receivedShareChan chan<- share.ReceivedShareWithUser) error { log := appctx.GetLogger(ctx) @@ -226,19 +242,11 @@ func (m *Manager) Dump(ctx context.Context, shareChan chan<- *collaboration.Shar if err != nil { continue } - for _, grantee := range grantees { // use default values if the grantee didn't configure anything yet - metadata := ReceivedShareMetadata{ - State: collaboration.ShareState_SHARE_STATE_PENDING, - } - data, err := m.storage.SimpleDownload(ctx, path.Join("metadata", s.Id.OpaqueId, grantee)) - if err == nil { - err = json.Unmarshal(data, &metadata) - if err != nil { - continue - } - } + for _, grantee := range grantees { + metadata := m.getMetadata(ctx, s.GetId().GetOpaqueId(), grantee) g, err := indexToGrantee(grantee) if err != nil || g.Type != provider.GranteeType_GRANTEE_TYPE_USER { + // ignore group grants, as every user has his own received state continue } receivedShareChan <- share.ReceivedShareWithUser{ diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index ec4898ecfa..2fcb29823d 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -157,8 +157,7 @@ var _ = Describe("Jsoncs3", func() { receivedChan := make(chan sharespkg.ReceivedShareWithUser) share := &collaboration.Share{ - Id: &collaboration.ShareId{OpaqueId: "1iaeiae$vlcvlcvlc!pzbpzbpzb"}, - // FIXME we may have to deal with importing existing share ids ... without a storage or provider prefix + Id: &collaboration.ShareId{OpaqueId: "1iaeiae$vlcvlcvlc!pzbpzbpzb"}, ResourceId: &provider.ResourceId{StorageId: "1iaeiae", SpaceId: "vlcvlcvlc", OpaqueId: "abcd"}, Creator: user1.GetId(), Grantee: &provider.Grantee{ From 4cc9091a1e041ec6c159507d1be75feab91a30f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 14:12:09 +0000 Subject: [PATCH 12/16] extract validation functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../storageprovider/storageprovider.go | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 148bb2c923..2295a14634 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -288,6 +288,20 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia }, nil } +func validateIfMatch(ifMatch string, info *provider.ResourceInfo) bool { + return ifMatch != info.GetEtag() +} +func validateIfUnmodifiedSince(ifUnmodifiedSince *typesv1beta1.Timestamp, info *provider.ResourceInfo) bool { + switch { + case ifUnmodifiedSince == nil || info.GetMtime() == nil: + return true + case utils.LaterTS(info.GetMtime(), ifUnmodifiedSince) == info.GetMtime(): + return false + default: + return true + } +} + func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) { // TODO(labkode): same considerations as download log := appctx.GetLogger(ctx) @@ -314,20 +328,17 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate metadata := map[string]string{} ifMatch := req.GetIfMatch() if ifMatch != "" { - if sRes.Info.Etag != ifMatch { + if !validateIfMatch(ifMatch, sRes.GetInfo()) { return &provider.InitiateFileUploadResponse{ Status: status.NewFailedPrecondition(ctx, errors.New("etag mismatch"), "etag mismatch"), }, nil } metadata["if-match"] = ifMatch } - ifUnmodifiedSince := req.GetIfUnmodifiedSince() - if ifUnmodifiedSince != nil { - if utils.LaterTS(sRes.GetInfo().GetMtime(), ifUnmodifiedSince) == sRes.Info.Mtime { - return &provider.InitiateFileUploadResponse{ - Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"), - }, nil - } + if !validateIfUnmodifiedSince(req.GetIfUnmodifiedSince(), sRes.GetInfo()) { + return &provider.InitiateFileUploadResponse{ + Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"), + }, nil } ctx = ctxpkg.ContextSetLockID(ctx, req.LockId) From 65fe5426cfab6e86e303384bbb1b1bba97f3301f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 14:12:37 +0000 Subject: [PATCH 13/16] remove unnecessary mutex from Load() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/jsoncs3.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 693ff8f9c3..c7c2ecd029 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -714,7 +714,6 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar return err } - var mu sync.Mutex var wg sync.WaitGroup wg.Add(2) go func() { @@ -725,7 +724,6 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if !shareIsRoutable(s) { updateShareID(s) } - mu.Lock() if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil { log.Error().Err(err).Interface("share", s).Msg("error persisting share") } else { @@ -736,7 +734,6 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar } else { log.Info().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") } - mu.Unlock() } wg.Done() }() @@ -746,7 +743,6 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if !shareIsRoutable(s.ReceivedShare.GetShare()) { updateShareID(s.ReceivedShare.GetShare()) } - mu.Lock() switch s.ReceivedShare.Share.Grantee.Type { case provider.GranteeType_GRANTEE_TYPE_USER: if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil { @@ -761,7 +757,6 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar log.Info().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") } } - mu.Unlock() } } wg.Done() From 8f991ef8399725bf224ee0bf7448ba24523a3e5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 14:58:41 +0000 Subject: [PATCH 14/16] decrease log level to debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/jsoncs3/jsoncs3.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index c7c2ecd029..e56339c7db 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -727,12 +727,12 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil { log.Error().Err(err).Interface("share", s).Msg("error persisting share") } else { - log.Info().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") + log.Debug().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share") } if err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId); err != nil { log.Error().Err(err).Interface("share", s).Msg("error persisting created cache") } else { - log.Info().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") + log.Debug().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache") } } wg.Done() @@ -748,13 +748,13 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil { log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user") } else { - log.Info().Str("userid", s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId()).Str("spaceid", s.ReceivedShare.GetShare().GetResourceId().GetSpaceId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") + log.Debug().Str("userid", s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId()).Str("spaceid", s.ReceivedShare.GetShare().GetResourceId().GetSpaceId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata") } case provider.GranteeType_GRANTEE_TYPE_GROUP: if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil { log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache") } else { - log.Info().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") + log.Debug().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache") } } } From dfe0a84be2beae8134d6fe7799f0482a198c299e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 30 Aug 2022 14:59:01 +0000 Subject: [PATCH 15/16] prevent nil MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../manager/jsoncs3/sharecache/sharecache.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/pkg/share/manager/jsoncs3/sharecache/sharecache.go index b7699ab8f4..fab9f1a435 100644 --- a/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -93,16 +93,21 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { storageid, spaceid, _ := shareid.Decode(shareID) ssid := storageid + shareid.IDDelimiter + spaceid - if c.UserShares[userid] != nil { - if c.UserShares[userid].UserShares[ssid] != nil { - // remove share id - now := time.Now() - c.UserShares[userid].Mtime = now - c.UserShares[userid].UserShares[ssid].Mtime = now - delete(c.UserShares[userid].UserShares[ssid].IDs, shareID) + now := time.Now() + if c.UserShares[userid] == nil { + c.UserShares[userid] = &UserShareCache{ + Mtime: now, + UserShares: map[string]*SpaceShareIDs{}, } } + if c.UserShares[userid].UserShares[ssid] != nil { + // remove share id + c.UserShares[userid].Mtime = now + c.UserShares[userid].UserShares[ssid].Mtime = now + delete(c.UserShares[userid].UserShares[ssid].IDs, shareID) + } + return c.Persist(ctx, userid) } From f8df5bfc086b24372dcbb97e8a49e44b0fd2f5f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 31 Aug 2022 08:35:37 +0000 Subject: [PATCH 16/16] use more ideomatic go error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/share/manager/cs3/cs3.go | 47 ++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/pkg/share/manager/cs3/cs3.go b/pkg/share/manager/cs3/cs3.go index ac0d6dd12d..9775c9f8d9 100644 --- a/pkg/share/manager/cs3/cs3.go +++ b/pkg/share/manager/cs3/cs3.go @@ -234,32 +234,33 @@ func (m *Manager) Dump(ctx context.Context, shareChan chan<- *collaboration.Shar return err } for _, shareid := range shareids { - if s, err := m.getShareByID(ctx, shareid); err == nil { - // dump share data - shareChan <- s - // dump grantee metadata that includes share state and mount path - grantees, err := m.storage.ReadDir(ctx, path.Join("metadata", s.Id.OpaqueId)) - if err != nil { + var s *collaboration.Share + if s, err = m.getShareByID(ctx, shareid); err != nil { + log.Error().Err(err).Str("shareid", shareid).Msg("error fetching share") + continue + } + // dump share data + shareChan <- s + // dump grantee metadata that includes share state and mount path + grantees, err := m.storage.ReadDir(ctx, path.Join("metadata", s.Id.OpaqueId)) + if err != nil { + continue + } + for _, grantee := range grantees { + metadata := m.getMetadata(ctx, s.GetId().GetOpaqueId(), grantee) + g, err := indexToGrantee(grantee) + if err != nil || g.Type != provider.GranteeType_GRANTEE_TYPE_USER { + // ignore group grants, as every user has his own received state continue } - for _, grantee := range grantees { - metadata := m.getMetadata(ctx, s.GetId().GetOpaqueId(), grantee) - g, err := indexToGrantee(grantee) - if err != nil || g.Type != provider.GranteeType_GRANTEE_TYPE_USER { - // ignore group grants, as every user has his own received state - continue - } - receivedShareChan <- share.ReceivedShareWithUser{ - UserID: g.GetUserId(), - ReceivedShare: &collaboration.ReceivedShare{ - Share: s, - State: metadata.State, - MountPoint: metadata.MountPoint, - }, - } + receivedShareChan <- share.ReceivedShareWithUser{ + UserID: g.GetUserId(), + ReceivedShare: &collaboration.ReceivedShare{ + Share: s, + State: metadata.State, + MountPoint: metadata.MountPoint, + }, } - } else { - log.Error().Err(err).Str("shareid", shareid).Msg("error fetching share") } } return nil