From 1994b84b99f7bf484561db125df134ea92a568e0 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 27 Apr 2022 16:49:47 +0200 Subject: [PATCH] Concurrently stat shares in ocs service --- changelog/unreleased/ocs-concurrent-stat.md | 3 + .../handlers/apps/sharing/shares/public.go | 87 ++++---- .../handlers/apps/sharing/shares/shares.go | 189 ++++++++++-------- .../ocs/handlers/apps/sharing/shares/user.go | 89 ++++++--- 4 files changed, 217 insertions(+), 151 deletions(-) create mode 100644 changelog/unreleased/ocs-concurrent-stat.md diff --git a/changelog/unreleased/ocs-concurrent-stat.md b/changelog/unreleased/ocs-concurrent-stat.md new file mode 100644 index 0000000000..962c722d29 --- /dev/null +++ b/changelog/unreleased/ocs-concurrent-stat.md @@ -0,0 +1,3 @@ +Enhancement: Concurrently resolve shares in ocs HTTP service + +https://github.com/cs3org/reva/pull/2787 \ No newline at end of file diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/public.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/public.go index a56444daaa..39e34bc7b1 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/public.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/public.go @@ -19,11 +19,14 @@ package shares import ( + "context" "encoding/json" "fmt" "net/http" "strconv" + "sync" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" link "github.com/cs3org/go-cs3apis/cs3/sharing/link/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -180,52 +183,68 @@ func (h *Handler) listPublicShares(r *http.Request, filters []*link.ListPublicSh log := appctx.GetLogger(ctx) ocsDataPayload := make([]*conversions.ShareData, 0) - // TODO(refs) why is this guard needed? Are we moving towards a gateway only for service discovery? without a gateway this is dead code. - if h.gatewayAddr != "" { - client, err := pool.GetGatewayServiceClient(h.gatewayAddr) - if err != nil { - return ocsDataPayload, nil, err - } + client, err := pool.GetGatewayServiceClient(h.gatewayAddr) + if err != nil { + return ocsDataPayload, nil, err + } - req := link.ListPublicSharesRequest{ - Filters: filters, - } + req := link.ListPublicSharesRequest{ + Filters: filters, + } - res, err := client.ListPublicShares(ctx, &req) - if err != nil { - return ocsDataPayload, nil, err - } - if res.Status.Code != rpc.Code_CODE_OK { - return ocsDataPayload, res.Status, nil - } + res, err := client.ListPublicShares(ctx, &req) + if err != nil { + return ocsDataPayload, nil, err + } + if res.Status.Code != rpc.Code_CODE_OK { + return ocsDataPayload, res.Status, nil + } - for _, share := range res.GetShare() { - info, status, err := h.getResourceInfoByID(ctx, client, share.ResourceId) - if err != nil || status.Code != rpc.Code_CODE_OK { - log.Debug().Interface("share", share).Interface("status", status).Err(err).Msg("could not stat share, skipping") - continue - } + var wg sync.WaitGroup + workers := 50 + input := make(chan *link.PublicShare, len(res.Share)) + output := make(chan *conversions.ShareData, len(res.Share)) - sData := conversions.PublicShare2ShareData(share, r, h.publicURL) + for i := 0; i < workers; i++ { + wg.Add(1) + go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *link.PublicShare, output chan *conversions.ShareData, wg *sync.WaitGroup) { + defer wg.Done() - sData.Name = share.DisplayName + for share := range input { + info, status, err := h.getResourceInfoByID(ctx, client, share.ResourceId) + if err != nil || status.Code != rpc.Code_CODE_OK { + log.Debug().Interface("share", share).Interface("status", status).Err(err).Msg("could not stat share, skipping") + return + } - if err := h.addFileInfo(ctx, sData, info); err != nil { - log.Debug().Interface("share", share).Interface("info", info).Err(err).Msg("could not add file info, skipping") - continue - } - h.mapUserIds(ctx, client, sData) + sData := conversions.PublicShare2ShareData(share, r, h.publicURL) - log.Debug().Interface("share", share).Interface("info", info).Interface("shareData", share).Msg("mapped") + sData.Name = share.DisplayName - ocsDataPayload = append(ocsDataPayload, sData) + if err := h.addFileInfo(ctx, sData, info); err != nil { + log.Debug().Interface("share", share).Interface("info", info).Err(err).Msg("could not add file info, skipping") + return + } + h.mapUserIds(ctx, client, sData) - } + log.Debug().Interface("share", share).Interface("info", info).Interface("shareData", sData).Msg("mapped") + output <- sData + } + }(ctx, client, input, output, &wg) + } + + for _, share := range res.Share { + input <- share + } + close(input) + wg.Wait() + close(output) - return ocsDataPayload, nil, nil + for s := range output { + ocsDataPayload = append(ocsDataPayload, s) } - return ocsDataPayload, nil, errors.New("bad request") + return ocsDataPayload, nil, nil } func (h *Handler) isPublicShare(r *http.Request, oid string) bool { diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index bbfd776133..c1d96250d5 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -28,6 +28,7 @@ import ( "path" "strconv" "strings" + "sync" "text/template" "time" @@ -799,101 +800,121 @@ func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) { shares := make([]*conversions.ShareData, 0, len(lrsRes.GetShares())) - // TODO(refs) filter out "invalid" shares - for _, rs := range lrsRes.GetShares() { - if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter { - continue - } - var info *provider.ResourceInfo - if pinfo != nil { - // check if the shared resource matches the path resource - if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) { - // try next share - continue - } - // we can reuse the stat info - info = pinfo - } else { - var status *rpc.Status - // FIXME the ResourceID is the id of the resource, but we want the id of the mount point so we can fetch that path, well we have the mountpoint path in the receivedshare - // first stat mount point - mountID := &provider.ResourceId{ - StorageId: utils.ShareStorageProviderID, - OpaqueId: rs.Share.Id.OpaqueId, - } - info, status, err = h.getResourceInfoByID(ctx, client, mountID) - if err != nil || status.Code != rpc.Code_CODE_OK { - // fallback to unmounted resource - info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId) - if err != nil || status.Code != rpc.Code_CODE_OK { - h.logProblems(status, err, "could not stat, skipping") - continue - } - } - } - - data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share) - if err != nil { - log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping") - continue - } + var wg sync.WaitGroup + workers := 50 + input := make(chan *collaboration.ReceivedShare, len(lrsRes.GetShares())) + output := make(chan *conversions.ShareData, len(lrsRes.GetShares())) - data.State = mapState(rs.GetState()) + for i := 0; i < workers; i++ { + wg.Add(1) + go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *collaboration.ReceivedShare, output chan *conversions.ShareData, wg *sync.WaitGroup) { + defer wg.Done() - if err := h.addFileInfo(ctx, data, info); err != nil { - log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping") - continue - } - h.mapUserIds(r.Context(), client, data) - - if data.State == ocsStateAccepted { - // only accepted shares can be accessed when jailing users into their home. - // in this case we cannot stat shared resources that are outside the users home (/home), - // the path (/users/u-u-i-d/foo) will not be accessible - - // in a global namespace we can access the share using the full path - // in a jailed namespace we have to point to the mount point in the users /Shares jail - // - needed for oc10 hot migration - // or use the /dav/spaces/ endpoint? + for rs := range input { + if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter { + return + } + var info *provider.ResourceInfo + if pinfo != nil { + // check if the shared resource matches the path resource + if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) { + // try next share + return + } + // we can reuse the stat info + info = pinfo + } else { + var status *rpc.Status + // FIXME the ResourceID is the id of the resource, but we want the id of the mount point so we can fetch that path, well we have the mountpoint path in the receivedshare + // first stat mount point + mountID := &provider.ResourceId{ + StorageId: utils.ShareStorageProviderID, + OpaqueId: rs.Share.Id.OpaqueId, + } + info, status, err = h.getResourceInfoByID(ctx, client, mountID) + if err != nil || status.Code != rpc.Code_CODE_OK { + // fallback to unmounted resource + info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId) + if err != nil || status.Code != rpc.Code_CODE_OK { + h.logProblems(status, err, "could not stat, skipping") + return + } + } + } - // list /Shares and match fileids with list of received shares - // - only works for a /Shares folder jail - // - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet + data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share) + if err != nil { + log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping") + return + } - // can we return the mountpoint when the gateway resolves the listing of shares? - // - no, the gateway only sees the same list any has the same options as the ocs service - // - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration + data.State = mapState(rs.GetState()) - // best we can do for now is stat the /Shares jail if it is set and return those paths + if err := h.addFileInfo(ctx, data, info); err != nil { + log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping") + return + } + h.mapUserIds(r.Context(), client, data) + + if data.State == ocsStateAccepted { + // only accepted shares can be accessed when jailing users into their home. + // in this case we cannot stat shared resources that are outside the users home (/home), + // the path (/users/u-u-i-d/foo) will not be accessible + + // in a global namespace we can access the share using the full path + // in a jailed namespace we have to point to the mount point in the users /Shares jail + // - needed for oc10 hot migration + // or use the /dav/spaces/ endpoint? + + // list /Shares and match fileids with list of received shares + // - only works for a /Shares folder jail + // - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet + + // can we return the mountpoint when the gateway resolves the listing of shares? + // - no, the gateway only sees the same list any has the same options as the ocs service + // - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration + + // best we can do for now is stat the /Shares jail if it is set and return those paths + + // if we are in a jail and the current share has been accepted use the stat from the share jail + // Needed because received shares can be jailed in a folder in the users home + + if h.sharePrefix != "/" { + // if we have share jail infos use them to build the path + if rs.MountPoint != nil && rs.MountPoint.Path != "" { + // override path with info from share jail + data.FileTarget = path.Join(h.sharePrefix, rs.MountPoint.Path) + data.Path = path.Join(h.sharePrefix, rs.MountPoint.Path) + } else { + data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path)) + data.Path = path.Join(h.sharePrefix, path.Base(info.Path)) + } + } else { + data.FileTarget = info.Path + data.Path = info.Path + } + } else { + // not accepted shares need their Path jailed to make the testsuite happy - // if we are in a jail and the current share has been accepted use the stat from the share jail - // Needed because received shares can be jailed in a folder in the users home + if h.sharePrefix != "/" { + data.Path = path.Join("/", path.Base(info.Path)) + } - if h.sharePrefix != "/" { - // if we have a mount point use it to build the path - if rs.MountPoint != nil && rs.MountPoint.Path != "" { - // override path with info from share jail - data.FileTarget = path.Join(h.sharePrefix, rs.MountPoint.Path) - data.Path = path.Join(h.sharePrefix, rs.MountPoint.Path) - } else { - data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path)) - data.Path = path.Join(h.sharePrefix, path.Base(info.Path)) } - } else { - data.FileTarget = info.Path - data.Path = info.Path - } - } else { - // not accepted shares need their Path jailed to make the testsuite happy - - if h.sharePrefix != "/" { - data.Path = path.Join("/", path.Base(info.Path)) + output <- data } + }(ctx, client, input, output, &wg) + } - } + for _, share := range lrsRes.GetShares() { + input <- share + } + close(input) + wg.Wait() + close(output) - shares = append(shares, data) - log.Debug().Msgf("share: %+v", *data) + for s := range output { + shares = append(shares, s) } response.WriteOCSSuccess(w, r, shares) diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/user.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/user.go index 335e0b9448..e482810e32 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/user.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/user.go @@ -19,8 +19,11 @@ package shares import ( + "context" "net/http" + "sync" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" @@ -190,45 +193,65 @@ func (h *Handler) listUserShares(r *http.Request, filters []*collaboration.Filte } ocsDataPayload := make([]*conversions.ShareData, 0) - if h.gatewayAddr != "" { - // get a connection to the users share provider - client, err := h.getClient() - if err != nil { - return ocsDataPayload, nil, err - } + client, err := h.getClient() + if err != nil { + return ocsDataPayload, nil, err + } - // do list shares request. filtered - lsUserSharesResponse, err := client.ListShares(ctx, &lsUserSharesRequest) - if err != nil { - return ocsDataPayload, nil, err - } - if lsUserSharesResponse.Status.Code != rpc.Code_CODE_OK { - return ocsDataPayload, lsUserSharesResponse.Status, nil - } + // do list shares request. filtered + lsUserSharesResponse, err := client.ListShares(ctx, &lsUserSharesRequest) + if err != nil { + return ocsDataPayload, nil, err + } + if lsUserSharesResponse.Status.Code != rpc.Code_CODE_OK { + return ocsDataPayload, lsUserSharesResponse.Status, nil + } - // build OCS response payload - for _, s := range lsUserSharesResponse.Shares { - data, err := conversions.CS3Share2ShareData(ctx, s) - if err != nil { - log.Debug().Interface("share", s).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping") - continue - } + var wg sync.WaitGroup + workers := 50 + input := make(chan *collaboration.Share, len(lsUserSharesResponse.Shares)) + output := make(chan *conversions.ShareData, len(lsUserSharesResponse.Shares)) - info, status, err := h.getResourceInfoByID(ctx, client, s.ResourceId) - if err != nil || status.Code != rpc.Code_CODE_OK { - log.Debug().Interface("share", s).Interface("status", status).Interface("shareData", data).Err(err).Msg("could not stat share, skipping") - continue - } + for i := 0; i < workers; i++ { + wg.Add(1) + go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *collaboration.Share, output chan *conversions.ShareData, wg *sync.WaitGroup) { + defer wg.Done() + + // build OCS response payload + for s := range input { + data, err := conversions.CS3Share2ShareData(ctx, s) + if err != nil { + log.Debug().Interface("share", s).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping") + return + } + + info, status, err := h.getResourceInfoByID(ctx, client, s.ResourceId) + if err != nil || status.Code != rpc.Code_CODE_OK { + log.Debug().Interface("share", s).Interface("status", status).Interface("shareData", data).Err(err).Msg("could not stat share, skipping") + return + } + + if err := h.addFileInfo(ctx, data, info); err != nil { + log.Debug().Interface("share", s).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping") + return + } + h.mapUserIds(ctx, client, data) - if err := h.addFileInfo(ctx, data, info); err != nil { - log.Debug().Interface("share", s).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping") - continue + log.Debug().Interface("share", s).Interface("info", info).Interface("shareData", data).Msg("mapped") + output <- data } - h.mapUserIds(ctx, client, data) + }(ctx, client, input, output, &wg) + } - log.Debug().Interface("share", s).Interface("info", info).Interface("shareData", data).Msg("mapped") - ocsDataPayload = append(ocsDataPayload, data) - } + for _, share := range lsUserSharesResponse.Shares { + input <- share + } + close(input) + wg.Wait() + close(output) + + for s := range output { + ocsDataPayload = append(ocsDataPayload, s) } return ocsDataPayload, nil, nil