Skip to content

Commit

Permalink
Concurrently stat shares in ocs service
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Apr 27, 2022
1 parent 618964e commit 1994b84
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 151 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/ocs-concurrent-stat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Concurrently resolve shares in ocs HTTP service

https://github.com/cs3org/reva/pull/2787
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package shares

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"sync"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
link "github.com/cs3org/go-cs3apis/cs3/sharing/link/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -180,52 +183,68 @@ func (h *Handler) listPublicShares(r *http.Request, filters []*link.ListPublicSh
log := appctx.GetLogger(ctx)

ocsDataPayload := make([]*conversions.ShareData, 0)
// TODO(refs) why is this guard needed? Are we moving towards a gateway only for service discovery? without a gateway this is dead code.
if h.gatewayAddr != "" {
client, err := pool.GetGatewayServiceClient(h.gatewayAddr)
if err != nil {
return ocsDataPayload, nil, err
}
client, err := pool.GetGatewayServiceClient(h.gatewayAddr)
if err != nil {
return ocsDataPayload, nil, err
}

req := link.ListPublicSharesRequest{
Filters: filters,
}
req := link.ListPublicSharesRequest{
Filters: filters,
}

res, err := client.ListPublicShares(ctx, &req)
if err != nil {
return ocsDataPayload, nil, err
}
if res.Status.Code != rpc.Code_CODE_OK {
return ocsDataPayload, res.Status, nil
}
res, err := client.ListPublicShares(ctx, &req)
if err != nil {
return ocsDataPayload, nil, err
}
if res.Status.Code != rpc.Code_CODE_OK {
return ocsDataPayload, res.Status, nil
}

for _, share := range res.GetShare() {
info, status, err := h.getResourceInfoByID(ctx, client, share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
log.Debug().Interface("share", share).Interface("status", status).Err(err).Msg("could not stat share, skipping")
continue
}
var wg sync.WaitGroup
workers := 50
input := make(chan *link.PublicShare, len(res.Share))
output := make(chan *conversions.ShareData, len(res.Share))

sData := conversions.PublicShare2ShareData(share, r, h.publicURL)
for i := 0; i < workers; i++ {
wg.Add(1)
go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *link.PublicShare, output chan *conversions.ShareData, wg *sync.WaitGroup) {
defer wg.Done()

sData.Name = share.DisplayName
for share := range input {
info, status, err := h.getResourceInfoByID(ctx, client, share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
log.Debug().Interface("share", share).Interface("status", status).Err(err).Msg("could not stat share, skipping")
return
}

if err := h.addFileInfo(ctx, sData, info); err != nil {
log.Debug().Interface("share", share).Interface("info", info).Err(err).Msg("could not add file info, skipping")
continue
}
h.mapUserIds(ctx, client, sData)
sData := conversions.PublicShare2ShareData(share, r, h.publicURL)

log.Debug().Interface("share", share).Interface("info", info).Interface("shareData", share).Msg("mapped")
sData.Name = share.DisplayName

ocsDataPayload = append(ocsDataPayload, sData)
if err := h.addFileInfo(ctx, sData, info); err != nil {
log.Debug().Interface("share", share).Interface("info", info).Err(err).Msg("could not add file info, skipping")
return
}
h.mapUserIds(ctx, client, sData)

}
log.Debug().Interface("share", share).Interface("info", info).Interface("shareData", sData).Msg("mapped")
output <- sData
}
}(ctx, client, input, output, &wg)
}

for _, share := range res.Share {
input <- share
}
close(input)
wg.Wait()
close(output)

return ocsDataPayload, nil, nil
for s := range output {
ocsDataPayload = append(ocsDataPayload, s)
}

return ocsDataPayload, nil, errors.New("bad request")
return ocsDataPayload, nil, nil
}

func (h *Handler) isPublicShare(r *http.Request, oid string) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -799,101 +800,121 @@ func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) {

shares := make([]*conversions.ShareData, 0, len(lrsRes.GetShares()))

// TODO(refs) filter out "invalid" shares
for _, rs := range lrsRes.GetShares() {
if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter {
continue
}
var info *provider.ResourceInfo
if pinfo != nil {
// check if the shared resource matches the path resource
if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) {
// try next share
continue
}
// we can reuse the stat info
info = pinfo
} else {
var status *rpc.Status
// FIXME the ResourceID is the id of the resource, but we want the id of the mount point so we can fetch that path, well we have the mountpoint path in the receivedshare
// first stat mount point
mountID := &provider.ResourceId{
StorageId: utils.ShareStorageProviderID,
OpaqueId: rs.Share.Id.OpaqueId,
}
info, status, err = h.getResourceInfoByID(ctx, client, mountID)
if err != nil || status.Code != rpc.Code_CODE_OK {
// fallback to unmounted resource
info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
h.logProblems(status, err, "could not stat, skipping")
continue
}
}
}

data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping")
continue
}
var wg sync.WaitGroup
workers := 50
input := make(chan *collaboration.ReceivedShare, len(lrsRes.GetShares()))
output := make(chan *conversions.ShareData, len(lrsRes.GetShares()))

data.State = mapState(rs.GetState())
for i := 0; i < workers; i++ {
wg.Add(1)
go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *collaboration.ReceivedShare, output chan *conversions.ShareData, wg *sync.WaitGroup) {
defer wg.Done()

if err := h.addFileInfo(ctx, data, info); err != nil {
log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping")
continue
}
h.mapUserIds(r.Context(), client, data)

if data.State == ocsStateAccepted {
// only accepted shares can be accessed when jailing users into their home.
// in this case we cannot stat shared resources that are outside the users home (/home),
// the path (/users/u-u-i-d/foo) will not be accessible

// in a global namespace we can access the share using the full path
// in a jailed namespace we have to point to the mount point in the users /Shares jail
// - needed for oc10 hot migration
// or use the /dav/spaces/<space id> endpoint?
for rs := range input {
if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter {
return
}
var info *provider.ResourceInfo
if pinfo != nil {
// check if the shared resource matches the path resource
if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) {
// try next share
return
}
// we can reuse the stat info
info = pinfo
} else {
var status *rpc.Status
// FIXME the ResourceID is the id of the resource, but we want the id of the mount point so we can fetch that path, well we have the mountpoint path in the receivedshare
// first stat mount point
mountID := &provider.ResourceId{
StorageId: utils.ShareStorageProviderID,
OpaqueId: rs.Share.Id.OpaqueId,
}
info, status, err = h.getResourceInfoByID(ctx, client, mountID)
if err != nil || status.Code != rpc.Code_CODE_OK {
// fallback to unmounted resource
info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
h.logProblems(status, err, "could not stat, skipping")
return
}
}
}

// list /Shares and match fileids with list of received shares
// - only works for a /Shares folder jail
// - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet
data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping")
return
}

// can we return the mountpoint when the gateway resolves the listing of shares?
// - no, the gateway only sees the same list any has the same options as the ocs service
// - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration
data.State = mapState(rs.GetState())

// best we can do for now is stat the /Shares jail if it is set and return those paths
if err := h.addFileInfo(ctx, data, info); err != nil {
log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping")
return
}
h.mapUserIds(r.Context(), client, data)

if data.State == ocsStateAccepted {
// only accepted shares can be accessed when jailing users into their home.
// in this case we cannot stat shared resources that are outside the users home (/home),
// the path (/users/u-u-i-d/foo) will not be accessible

// in a global namespace we can access the share using the full path
// in a jailed namespace we have to point to the mount point in the users /Shares jail
// - needed for oc10 hot migration
// or use the /dav/spaces/<space id> endpoint?

// list /Shares and match fileids with list of received shares
// - only works for a /Shares folder jail
// - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet

// can we return the mountpoint when the gateway resolves the listing of shares?
// - no, the gateway only sees the same list any has the same options as the ocs service
// - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration

// best we can do for now is stat the /Shares jail if it is set and return those paths

// if we are in a jail and the current share has been accepted use the stat from the share jail
// Needed because received shares can be jailed in a folder in the users home

if h.sharePrefix != "/" {
// if we have share jail infos use them to build the path
if rs.MountPoint != nil && rs.MountPoint.Path != "" {
// override path with info from share jail
data.FileTarget = path.Join(h.sharePrefix, rs.MountPoint.Path)
data.Path = path.Join(h.sharePrefix, rs.MountPoint.Path)
} else {
data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path))
data.Path = path.Join(h.sharePrefix, path.Base(info.Path))
}
} else {
data.FileTarget = info.Path
data.Path = info.Path
}
} else {
// not accepted shares need their Path jailed to make the testsuite happy

// if we are in a jail and the current share has been accepted use the stat from the share jail
// Needed because received shares can be jailed in a folder in the users home
if h.sharePrefix != "/" {
data.Path = path.Join("/", path.Base(info.Path))
}

if h.sharePrefix != "/" {
// if we have a mount point use it to build the path
if rs.MountPoint != nil && rs.MountPoint.Path != "" {
// override path with info from share jail
data.FileTarget = path.Join(h.sharePrefix, rs.MountPoint.Path)
data.Path = path.Join(h.sharePrefix, rs.MountPoint.Path)
} else {
data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path))
data.Path = path.Join(h.sharePrefix, path.Base(info.Path))
}
} else {
data.FileTarget = info.Path
data.Path = info.Path
}
} else {
// not accepted shares need their Path jailed to make the testsuite happy

if h.sharePrefix != "/" {
data.Path = path.Join("/", path.Base(info.Path))
output <- data
}
}(ctx, client, input, output, &wg)
}

}
for _, share := range lrsRes.GetShares() {
input <- share
}
close(input)
wg.Wait()
close(output)

shares = append(shares, data)
log.Debug().Msgf("share: %+v", *data)
for s := range output {
shares = append(shares, s)
}

response.WriteOCSSuccess(w, r, shares)
Expand Down
Loading

0 comments on commit 1994b84

Please sign in to comment.