Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load, dump and storageprovider fixes for migration #3171

Merged
merged 17 commits into from
Aug 31, 2022
Merged
5 changes: 5 additions & 0 deletions changelog/unreleased/jsoncs3-load.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: cs3 to jsoncs3 share manager migration

We added a Load() to the jsoncs3 and Dump() to the sc3 share manager. The shareid might need to be prefixed with a storageid and space id.

https://github.com/cs3org/reva/pull/3171
27 changes: 19 additions & 8 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,20 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia
}, nil
}

func validateIfMatch(ifMatch string, info *provider.ResourceInfo) bool {
return ifMatch != info.GetEtag()
}
func validateIfUnmodifiedSince(ifUnmodifiedSince *typesv1beta1.Timestamp, info *provider.ResourceInfo) bool {
switch {
case ifUnmodifiedSince == nil || info.GetMtime() == nil:
return true
case utils.LaterTS(info.GetMtime(), ifUnmodifiedSince) == info.GetMtime():
return false
default:
return true
}
}

func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
// TODO(labkode): same considerations as download
log := appctx.GetLogger(ctx)
Expand All @@ -314,20 +328,17 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
metadata := map[string]string{}
ifMatch := req.GetIfMatch()
if ifMatch != "" {
if sRes.Info.Etag != ifMatch {
if !validateIfMatch(ifMatch, sRes.GetInfo()) {
return &provider.InitiateFileUploadResponse{
Status: status.NewFailedPrecondition(ctx, errors.New("etag mismatch"), "etag mismatch"),
}, nil
}
metadata["if-match"] = ifMatch
}
ifUnmodifiedSince := req.GetIfUnmodifiedSince()
if ifUnmodifiedSince != nil {
if utils.LaterTS(sRes.Info.Mtime, ifUnmodifiedSince) == sRes.Info.Mtime {
return &provider.InitiateFileUploadResponse{
Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"),
}, nil
}
if !validateIfUnmodifiedSince(req.GetIfUnmodifiedSince(), sRes.GetInfo()) {
return &provider.InitiateFileUploadResponse{
Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"),
}, nil
}

ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
Expand Down
97 changes: 97 additions & 0 deletions pkg/share/manager/cs3/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,66 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
return nil
}

func (m *Manager) getMetadata(ctx context.Context, shareid, grantee string) ReceivedShareMetadata {
// use default values if the grantee didn't configure anything yet
metadata := ReceivedShareMetadata{
State: collaboration.ShareState_SHARE_STATE_PENDING,
}
data, err := m.storage.SimpleDownload(ctx, path.Join("metadata", shareid, grantee))
if err != nil {
return metadata
}
err = json.Unmarshal(data, &metadata)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("shareid", shareid).Msg("error fetching share")
}
return metadata
}

// Dump exports shares and received shares to channels (e.g. during migration)
func (m *Manager) Dump(ctx context.Context, shareChan chan<- *collaboration.Share, receivedShareChan chan<- share.ReceivedShareWithUser) error {
log := appctx.GetLogger(ctx)
if err := m.initialize(); err != nil {
return err
}

shareids, err := m.storage.ReadDir(ctx, "shares")
if err != nil {
return err
}
for _, shareid := range shareids {
var s *collaboration.Share
if s, err = m.getShareByID(ctx, shareid); err != nil {
log.Error().Err(err).Str("shareid", shareid).Msg("error fetching share")
continue
}
// dump share data
shareChan <- s
// dump grantee metadata that includes share state and mount path
grantees, err := m.storage.ReadDir(ctx, path.Join("metadata", s.Id.OpaqueId))
if err != nil {
continue
}
for _, grantee := range grantees {
metadata := m.getMetadata(ctx, s.GetId().GetOpaqueId(), grantee)
g, err := indexToGrantee(grantee)
if err != nil || g.Type != provider.GranteeType_GRANTEE_TYPE_USER {
// ignore group grants, as every user has his own received state
continue
}
receivedShareChan <- share.ReceivedShareWithUser{
UserID: g.GetUserId(),
ReceivedShare: &collaboration.ReceivedShare{
Share: s,
State: metadata.State,
MountPoint: metadata.MountPoint,
},
}
}
}
return nil
}

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
if err := m.initialize(); err != nil {
Expand Down Expand Up @@ -756,6 +816,43 @@ func granteeToIndex(grantee *provider.Grantee) (string, error) {
}
}

// indexToGrantee tries to unparse a grantee in a metadata dir
// unfortunately, it is just concatenated by :, causing nasty corner cases
func indexToGrantee(name string) (*provider.Grantee, error) {
unescaped, err := url.QueryUnescape(name)
if err != nil {
return nil, err
}
parts := strings.SplitN(unescaped, ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid grantee %s", unescaped)
}
switch parts[0] {
case "user":
lastInd := strings.LastIndex(parts[1], ":")
return &provider.Grantee{
Type: provider.GranteeType_GRANTEE_TYPE_USER,
Id: &provider.Grantee_UserId{
UserId: &userpb.UserId{
Idp: parts[1][:lastInd],
OpaqueId: parts[1][lastInd+1:],
},
},
}, nil
case "group":
return &provider.Grantee{
Type: provider.GranteeType_GRANTEE_TYPE_GROUP,
Id: &provider.Grantee_GroupId{
GroupId: &groupv1beta1.GroupId{
OpaqueId: parts[1],
},
},
}, nil
default:
return nil, fmt.Errorf("invalid grantee %s", unescaped)
}
}

func intersectSlices(a, b []string) []string {
aMap := map[string]bool{}
for _, s := range a {
Expand Down
76 changes: 72 additions & 4 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package jsoncs3

import (
"context"
"strings"
"sync"

"github.com/google/uuid"
Expand All @@ -30,6 +31,7 @@ import (
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/share"
Expand Down Expand Up @@ -251,7 +253,7 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
return nil, err
}

spaceID := md.Id.StorageId + "^" + md.Id.SpaceId
spaceID := md.Id.StorageId + shareid.IDDelimiter + md.Id.SpaceId
// set flag for grantee to have access to share
switch g.Grantee.Type {
case provider.GranteeType_GRANTEE_TYPE_USER:
Expand Down Expand Up @@ -622,7 +624,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)

_ = m.UserReceivedStates.Sync(ctx, userID) // ignore error, cache will be updated on next read
state := m.UserReceivedStates.Get(userID, storageID+"^"+spaceID, s.Id.GetOpaqueId())
state := m.UserReceivedStates.Get(userID, storageID+shareid.IDDelimiter+spaceID, s.Id.GetOpaqueId())
if state != nil {
rs.State = state.State
rs.MountPoint = state.MountPoint
Expand Down Expand Up @@ -682,13 +684,13 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab

userID := ctxpkg.ContextMustGetUser(ctx)

err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+"^"+rs.Share.ResourceId.SpaceId, rs)
err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
// when persisting fails, download, readd and persist again
if err := m.UserReceivedStates.Sync(ctx, userID.GetId().GetOpaqueId()); err != nil {
return nil, err
}
err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+"^"+rs.Share.ResourceId.SpaceId, rs)
err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs)
// TODO try more often?
}
if err != nil {
Expand All @@ -697,3 +699,69 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab

return rs, nil
}

func shareIsRoutable(share *collaboration.Share) bool {
return strings.Contains(share.Id.OpaqueId, shareid.IDDelimiter)
}
func updateShareID(share *collaboration.Share) {
share.Id.OpaqueId = shareid.Encode(share.ResourceId.StorageId, share.ResourceId.SpaceId, share.Id.OpaqueId)
}

// Load imports shares and received shares from channels (e.g. during migration)
func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error {
log := appctx.GetLogger(ctx)
if err := m.initialize(); err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(2)
go func() {
for s := range shareChan {
if s == nil {
continue
}
if !shareIsRoutable(s) {
updateShareID(s)
}
if err := m.Cache.Add(context.Background(), s.GetResourceId().GetStorageId(), s.GetResourceId().GetSpaceId(), s.Id.OpaqueId, s); err != nil {
log.Error().Err(err).Interface("share", s).Msg("error persisting share")
} else {
log.Debug().Str("storageid", s.GetResourceId().GetStorageId()).Str("spaceid", s.GetResourceId().GetSpaceId()).Str("shareid", s.Id.OpaqueId).Msg("imported share")
}
if err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId); err != nil {
log.Error().Err(err).Interface("share", s).Msg("error persisting created cache")
} else {
log.Debug().Str("creatorid", s.GetCreator().GetOpaqueId()).Str("shareid", s.Id.OpaqueId).Msg("updated created cache")
}
}
wg.Done()
}()
go func() {
for s := range receivedShareChan {
if s.ReceivedShare != nil {
if !shareIsRoutable(s.ReceivedShare.GetShare()) {
updateShareID(s.ReceivedShare.GetShare())
}
switch s.ReceivedShare.Share.Grantee.Type {
case provider.GranteeType_GRANTEE_TYPE_USER:
if err := m.UserReceivedStates.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId(), s.ReceivedShare.GetShare().GetResourceId().GetSpaceId(), s.ReceivedShare); err != nil {
log.Error().Err(err).Interface("received share", s).Msg("error persisting received share for user")
} else {
log.Debug().Str("userid", s.ReceivedShare.GetShare().GetGrantee().GetUserId().GetOpaqueId()).Str("spaceid", s.ReceivedShare.GetShare().GetResourceId().GetSpaceId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share userdata")
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
if err := m.GroupReceivedCache.Add(context.Background(), s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId(), s.ReceivedShare.GetShare().GetId().GetOpaqueId()); err != nil {
log.Error().Err(err).Interface("received share", s).Msg("error persisting received share to group cache")
} else {
log.Debug().Str("groupid", s.ReceivedShare.GetShare().GetGrantee().GetGroupId().GetOpaqueId()).Str("shareid", s.ReceivedShare.GetShare().Id.OpaqueId).Msg("updated received share group cache")
}
}
}
}
wg.Done()
}()
wg.Wait()

return nil
}
62 changes: 58 additions & 4 deletions pkg/share/manager/jsoncs3/jsoncs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"

groupv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
Expand All @@ -33,8 +34,10 @@ import (
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/conversions"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
sharespkg "github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"google.golang.org/protobuf/types/known/fieldmaskpb"

Expand Down Expand Up @@ -147,6 +150,57 @@ var _ = Describe("Jsoncs3", func() {
}
})

Describe("Load", func() {
It("loads shares including state and mountpoint information", func() {

sharesChan := make(chan *collaboration.Share)
receivedChan := make(chan sharespkg.ReceivedShareWithUser)

share := &collaboration.Share{
Id: &collaboration.ShareId{OpaqueId: "1iaeiae$vlcvlcvlc!pzbpzbpzb"},
ResourceId: &provider.ResourceId{StorageId: "1iaeiae", SpaceId: "vlcvlcvlc", OpaqueId: "abcd"},
Creator: user1.GetId(),
Grantee: &provider.Grantee{
Type: provider.GranteeType_GRANTEE_TYPE_USER,
Id: &provider.Grantee_UserId{UserId: grantee.GetId()},
},
Permissions: &collaboration.SharePermissions{
Permissions: &provider.ResourcePermissions{
GetPath: true,
InitiateFileDownload: true,
ListFileVersions: true,
ListContainer: true,
Stat: true,
},
},
}

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
err := m.Load(ctx, sharesChan, receivedChan)
Expect(err).ToNot(HaveOccurred())
wg.Done()
}()
go func() {
sharesChan <- share
close(sharesChan)
close(receivedChan)
wg.Done()
}()
wg.Wait()
Eventually(sharesChan).Should(BeClosed())
Eventually(receivedChan).Should(BeClosed())

s, err := m.GetShare(ctx, &collaboration.ShareReference{Spec: &collaboration.ShareReference_Id{
Id: share.Id,
}})

Expect(err).ToNot(HaveOccurred())
Expect(s.ResourceId.OpaqueId).To(Equal(share.ResourceId.OpaqueId))
})
})

Describe("Share", func() {
It("fails if the share already exists", func() {
_, err := m.Share(ctx, sharedResource, grant)
Expand Down Expand Up @@ -580,18 +634,18 @@ var _ = Describe("Jsoncs3", func() {
Expect(len(shares)).To(Equal(1))

// Add a second cache to the provider cache so it can be referenced
Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid^spaceid°secondshare", &collaboration.Share{
Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid"+shareid.IDDelimiter+"spaceid"+shareid.IDDelimiter+"secondshare", &collaboration.Share{
Creator: user1.Id,
})).To(Succeed())

cache := sharecache.UserShareCache{
Mtime: time.Now(),
UserShares: map[string]*sharecache.SpaceShareIDs{
"storageid^spaceid": {
"storageid" + shareid.IDDelimiter + "spaceid": {
Mtime: time.Now(),
IDs: map[string]struct{}{
shares[0].Id.OpaqueId: {},
"storageid^spaceid°secondshare": {},
shares[0].Id.OpaqueId: {},
"storageid" + shareid.IDDelimiter + "spaceid" + shareid.IDDelimiter + "secondshare": {},
},
},
},
Expand Down
Loading