Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently stat shares in ocs service #2787

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/ocs-concurrent-stat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Concurrently resolve shares in ocs HTTP service

https://github.com/cs3org/reva/pull/2787
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package shares

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"sync"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
link "github.com/cs3org/go-cs3apis/cs3/sharing/link/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -180,52 +183,68 @@ func (h *Handler) listPublicShares(r *http.Request, filters []*link.ListPublicSh
log := appctx.GetLogger(ctx)

ocsDataPayload := make([]*conversions.ShareData, 0)
// TODO(refs) why is this guard needed? Are we moving towards a gateway only for service discovery? without a gateway this is dead code.
if h.gatewayAddr != "" {
client, err := pool.GetGatewayServiceClient(h.gatewayAddr)
if err != nil {
return ocsDataPayload, nil, err
}
client, err := pool.GetGatewayServiceClient(h.gatewayAddr)
if err != nil {
return ocsDataPayload, nil, err
}

req := link.ListPublicSharesRequest{
Filters: filters,
}
req := link.ListPublicSharesRequest{
Filters: filters,
}

res, err := client.ListPublicShares(ctx, &req)
if err != nil {
return ocsDataPayload, nil, err
}
if res.Status.Code != rpc.Code_CODE_OK {
return ocsDataPayload, res.Status, nil
}
res, err := client.ListPublicShares(ctx, &req)
if err != nil {
return ocsDataPayload, nil, err
}
if res.Status.Code != rpc.Code_CODE_OK {
return ocsDataPayload, res.Status, nil
}

for _, share := range res.GetShare() {
info, status, err := h.getResourceInfoByID(ctx, client, share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
log.Debug().Interface("share", share).Interface("status", status).Err(err).Msg("could not stat share, skipping")
continue
}
var wg sync.WaitGroup
workers := 50
input := make(chan *link.PublicShare, len(res.Share))
output := make(chan *conversions.ShareData, len(res.Share))

sData := conversions.PublicShare2ShareData(share, r, h.publicURL)
for i := 0; i < workers; i++ {
wg.Add(1)
go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *link.PublicShare, output chan *conversions.ShareData, wg *sync.WaitGroup) {
defer wg.Done()

sData.Name = share.DisplayName
for share := range input {
info, status, err := h.getResourceInfoByID(ctx, client, share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
log.Debug().Interface("share", share).Interface("status", status).Err(err).Msg("could not stat share, skipping")
return
}

if err := h.addFileInfo(ctx, sData, info); err != nil {
log.Debug().Interface("share", share).Interface("info", info).Err(err).Msg("could not add file info, skipping")
continue
}
h.mapUserIds(ctx, client, sData)
sData := conversions.PublicShare2ShareData(share, r, h.publicURL)

log.Debug().Interface("share", share).Interface("info", info).Interface("shareData", share).Msg("mapped")
sData.Name = share.DisplayName

ocsDataPayload = append(ocsDataPayload, sData)
if err := h.addFileInfo(ctx, sData, info); err != nil {
log.Debug().Interface("share", share).Interface("info", info).Err(err).Msg("could not add file info, skipping")
return
}
h.mapUserIds(ctx, client, sData)

}
log.Debug().Interface("share", share).Interface("info", info).Interface("shareData", sData).Msg("mapped")
output <- sData
}
}(ctx, client, input, output, &wg)
}

for _, share := range res.Share {
input <- share
}
close(input)
wg.Wait()
close(output)

return ocsDataPayload, nil, nil
for s := range output {
ocsDataPayload = append(ocsDataPayload, s)
}

return ocsDataPayload, nil, errors.New("bad request")
return ocsDataPayload, nil, nil
}

func (h *Handler) isPublicShare(r *http.Request, oid string) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -799,101 +800,121 @@ func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) {

shares := make([]*conversions.ShareData, 0, len(lrsRes.GetShares()))

// TODO(refs) filter out "invalid" shares
for _, rs := range lrsRes.GetShares() {
if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter {
continue
}
var info *provider.ResourceInfo
if pinfo != nil {
// check if the shared resource matches the path resource
if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) {
// try next share
continue
}
// we can reuse the stat info
info = pinfo
} else {
var status *rpc.Status
// FIXME the ResourceID is the id of the resource, but we want the id of the mount point so we can fetch that path, well we have the mountpoint path in the receivedshare
// first stat mount point
mountID := &provider.ResourceId{
StorageId: utils.ShareStorageProviderID,
OpaqueId: rs.Share.Id.OpaqueId,
}
info, status, err = h.getResourceInfoByID(ctx, client, mountID)
if err != nil || status.Code != rpc.Code_CODE_OK {
// fallback to unmounted resource
info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
h.logProblems(status, err, "could not stat, skipping")
continue
}
}
}

data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping")
continue
}
var wg sync.WaitGroup
workers := 50
input := make(chan *collaboration.ReceivedShare, len(lrsRes.GetShares()))
output := make(chan *conversions.ShareData, len(lrsRes.GetShares()))

data.State = mapState(rs.GetState())
for i := 0; i < workers; i++ {
wg.Add(1)
go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *collaboration.ReceivedShare, output chan *conversions.ShareData, wg *sync.WaitGroup) {
defer wg.Done()

if err := h.addFileInfo(ctx, data, info); err != nil {
log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping")
continue
}
h.mapUserIds(r.Context(), client, data)

if data.State == ocsStateAccepted {
// only accepted shares can be accessed when jailing users into their home.
// in this case we cannot stat shared resources that are outside the users home (/home),
// the path (/users/u-u-i-d/foo) will not be accessible

// in a global namespace we can access the share using the full path
// in a jailed namespace we have to point to the mount point in the users /Shares jail
// - needed for oc10 hot migration
// or use the /dav/spaces/<space id> endpoint?
for rs := range input {
if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter {
return
}
var info *provider.ResourceInfo
if pinfo != nil {
// check if the shared resource matches the path resource
if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) {
// try next share
return
}
// we can reuse the stat info
info = pinfo
} else {
var status *rpc.Status
// FIXME the ResourceID is the id of the resource, but we want the id of the mount point so we can fetch that path, well we have the mountpoint path in the receivedshare
// first stat mount point
mountID := &provider.ResourceId{
StorageId: utils.ShareStorageProviderID,
OpaqueId: rs.Share.Id.OpaqueId,
}
info, status, err = h.getResourceInfoByID(ctx, client, mountID)
if err != nil || status.Code != rpc.Code_CODE_OK {
// fallback to unmounted resource
info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
h.logProblems(status, err, "could not stat, skipping")
return
}
}
}

// list /Shares and match fileids with list of received shares
// - only works for a /Shares folder jail
// - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet
data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping")
return
}

// can we return the mountpoint when the gateway resolves the listing of shares?
// - no, the gateway only sees the same list any has the same options as the ocs service
// - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration
data.State = mapState(rs.GetState())

// best we can do for now is stat the /Shares jail if it is set and return those paths
if err := h.addFileInfo(ctx, data, info); err != nil {
log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping")
return
}
h.mapUserIds(r.Context(), client, data)

if data.State == ocsStateAccepted {
// only accepted shares can be accessed when jailing users into their home.
// in this case we cannot stat shared resources that are outside the users home (/home),
// the path (/users/u-u-i-d/foo) will not be accessible

// in a global namespace we can access the share using the full path
// in a jailed namespace we have to point to the mount point in the users /Shares jail
// - needed for oc10 hot migration
// or use the /dav/spaces/<space id> endpoint?

// list /Shares and match fileids with list of received shares
// - only works for a /Shares folder jail
// - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet

// can we return the mountpoint when the gateway resolves the listing of shares?
// - no, the gateway only sees the same list any has the same options as the ocs service
// - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration

// best we can do for now is stat the /Shares jail if it is set and return those paths

// if we are in a jail and the current share has been accepted use the stat from the share jail
// Needed because received shares can be jailed in a folder in the users home

if h.sharePrefix != "/" {
// if we have share jail infos use them to build the path
if rs.MountPoint != nil && rs.MountPoint.Path != "" {
// override path with info from share jail
data.FileTarget = path.Join(h.sharePrefix, rs.MountPoint.Path)
data.Path = path.Join(h.sharePrefix, rs.MountPoint.Path)
} else {
data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path))
data.Path = path.Join(h.sharePrefix, path.Base(info.Path))
}
} else {
data.FileTarget = info.Path
data.Path = info.Path
}
} else {
// not accepted shares need their Path jailed to make the testsuite happy

// if we are in a jail and the current share has been accepted use the stat from the share jail
// Needed because received shares can be jailed in a folder in the users home
if h.sharePrefix != "/" {
data.Path = path.Join("/", path.Base(info.Path))
}

if h.sharePrefix != "/" {
// if we have a mount point use it to build the path
if rs.MountPoint != nil && rs.MountPoint.Path != "" {
// override path with info from share jail
data.FileTarget = path.Join(h.sharePrefix, rs.MountPoint.Path)
data.Path = path.Join(h.sharePrefix, rs.MountPoint.Path)
} else {
data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path))
data.Path = path.Join(h.sharePrefix, path.Base(info.Path))
}
} else {
data.FileTarget = info.Path
data.Path = info.Path
}
} else {
// not accepted shares need their Path jailed to make the testsuite happy

if h.sharePrefix != "/" {
data.Path = path.Join("/", path.Base(info.Path))
output <- data
}
}(ctx, client, input, output, &wg)
}

}
for _, share := range lrsRes.GetShares() {
input <- share
}
close(input)
wg.Wait()
close(output)

shares = append(shares, data)
log.Debug().Msgf("share: %+v", *data)
for s := range output {
shares = append(shares, s)
}

response.WriteOCSSuccess(w, r, shares)
Expand Down
Loading