Skip to content

Commit

Permalink
Aggregrate resource info properties for virtual views (#2215)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Oct 28, 2021
1 parent 4879bf6 commit 80b2065
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 144 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/eos-virtual-views.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Aggregrate resource info properties for virtual views

https://github.com/cs3org/reva/pull/2215
226 changes: 93 additions & 133 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"net/url"
"path"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -1283,6 +1282,7 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St
Status: status.NewStatusFromErrType(ctx, "stat ref: "+req.Ref.String(), err),
}, nil
}
providers = getUniqueProviders(providers)

resPath := req.Ref.GetPath()
if len(providers) == 1 && (utils.IsRelativeReference(req.Ref) || resPath == "" || strings.HasPrefix(resPath, providers[0].ProviderPath)) {
Expand All @@ -1303,69 +1303,47 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St
}

func (s *svc) statAcrossProviders(ctx context.Context, req *provider.StatRequest, providers []*registry.ProviderInfo) (*provider.StatResponse, error) {
// TODO(ishank011): aggregrate properties such as etag, checksum, etc.
log := appctx.GetLogger(ctx)

infoFromProviders := make([]*provider.ResourceInfo, len(providers))
errors := make([]error, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.statOnProvider(ctx, req, &infoFromProviders[i], p, &errors[i], &wg)
info := &provider.ResourceInfo{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: req.Ref.GetPath(),
MimeType: "httpd/unix-directory",
Size: 0,
Mtime: &types.Timestamp{},
}
wg.Wait()

var totalSize uint64
for i := range providers {
if errors[i] != nil {
log.Warn().Msgf("statting on provider %s returned err %+v", providers[i].ProviderPath, errors[i])
for _, p := range providers {
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
log.Err(err).Msg("error connecting to storage provider=" + p.Address)
continue
}
resp, err := c.Stat(ctx, req)
if err != nil {
log.Err(err).Msgf("gateway: error calling Stat %s: %+v", req.Ref.String(), p)
continue
}
if resp.Status.Code != rpc.Code_CODE_OK {
log.Err(status.NewErrorFromCode(rpc.Code_CODE_OK, "gateway"))
continue
}
if infoFromProviders[i] != nil {
totalSize += infoFromProviders[i].Size
if resp.Info != nil {
info.Size += resp.Info.Size
info.Mtime = utils.LaterTS(info.Mtime, resp.Info.Mtime)
}
}

// TODO(ishank011): aggregrate other properties for references spread across storage providers, eg. /eos
return &provider.StatResponse{
Status: status.NewOK(ctx),
Info: &provider.ResourceInfo{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: req.Ref.GetPath(),
Size: totalSize,
},
Info: info,
}, nil
}

func (s *svc) statOnProvider(ctx context.Context, req *provider.StatRequest, res **provider.ResourceInfo, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

if utils.IsAbsoluteReference(req.Ref) {
resPath := path.Clean(req.Ref.GetPath())
newPath := req.Ref.GetPath()
if resPath != "." && !strings.HasPrefix(resPath, p.ProviderPath) {
newPath = p.ProviderPath
}
req.Ref = &provider.Reference{Path: newPath}
}

r, err := c.Stat(ctx, req)
if err != nil {
*e = errors.Wrap(err, fmt.Sprintf("gateway: error calling Stat %s on %+v", req.Ref, p))
return
}
*res = r.Info
}

func (s *svc) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {

if utils.IsRelativeReference(req.Ref) {
Expand Down Expand Up @@ -1651,60 +1629,74 @@ func (s *svc) listSharesFolder(ctx context.Context) (*provider.ListContainerResp
}

func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
log := appctx.GetLogger(ctx)
providers, err := s.findProviders(ctx, req.Ref)
if err != nil {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), err),
}, nil
}
providers = getUniqueProviders(providers)

infoFromProviders := make([][]*provider.ResourceInfo, len(providers))
errors := make([]error, len(providers))
indirects := make([]bool, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.listContainerOnProvider(ctx, req, &infoFromProviders[i], p, &indirects[i], &errors[i], &wg)
resPath := req.Ref.GetPath()
if len(providers) == 1 && (utils.IsRelativeReference(req.Ref) || resPath == "" || strings.HasPrefix(resPath, providers[0].ProviderPath)) {
c, err := s.getStorageProviderClient(ctx, providers[0])
if err != nil {
return &provider.ListContainerResponse{
Status: status.NewInternal(ctx, err, "error connecting to storage provider="+providers[0].Address),
}, nil
}
rsp, err := c.ListContainer(ctx, req)
if err != nil || rsp.Status.Code != rpc.Code_CODE_OK {
return rsp, err
}
return rsp, nil
}
wg.Wait()

infos := []*provider.ResourceInfo{}
nestedInfos := make(map[string][]*provider.ResourceInfo)
for i := range providers {
if errors[i] != nil {
// return if there's only one mount, else skip this one
if len(providers) == 1 {
return &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "listContainer ref: "+req.Ref.String(), errors[i]),
}, nil
}
log.Warn().Msgf("listing container on provider %s returned err %+v", providers[i].ProviderPath, errors[i])
return s.listContainerAcrossProviders(ctx, req, providers)
}

func (s *svc) listContainerAcrossProviders(ctx context.Context, req *provider.ListContainerRequest, providers []*registry.ProviderInfo) (*provider.ListContainerResponse, error) {
nestedInfos := make(map[string]*provider.ResourceInfo)
log := appctx.GetLogger(ctx)

for _, p := range providers {
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
log.Err(err).Msg("error connecting to storage provider=" + p.Address)
continue
}
resp, err := c.ListContainer(ctx, req)
if err != nil {
log.Err(err).Msgf("gateway: error calling Stat %s: %+v", req.Ref.String(), p)
continue
}
if resp.Status.Code != rpc.Code_CODE_OK {
log.Err(status.NewErrorFromCode(rpc.Code_CODE_OK, "gateway"))
continue
}
for _, inf := range infoFromProviders[i] {
if indirects[i] {
p := inf.Path
// TODO do we need to trim prefix here for relative references?
nestedInfos[p] = append(nestedInfos[p], inf)

for _, info := range resp.Infos {
if p, ok := nestedInfos[info.Path]; ok {
// Since more than one providers contribute to this path,
// use a generic ID
p.Id = &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
}
// TODO(ishank011): aggregrate properties such as etag, checksum, etc.
p.Size += info.Size
p.Mtime = utils.LaterTS(p.Mtime, info.Mtime)
p.Type = provider.ResourceType_RESOURCE_TYPE_CONTAINER
p.MimeType = "httpd/unix-directory"
} else {
infos = append(infos, inf)
nestedInfos[info.Path] = info
}
}
}

for k := range nestedInfos {
inf := &provider.ResourceInfo{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: k,
Size: 0,
}
infos = append(infos, inf)
infos := make([]*provider.ResourceInfo, 0, len(nestedInfos))
for _, info := range nestedInfos {
infos = append(infos, info)
}

return &provider.ListContainerResponse{
Expand All @@ -1713,50 +1705,6 @@ func (s *svc) listContainer(ctx context.Context, req *provider.ListContainerRequ
}, nil
}

func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListContainerRequest, res *[]*provider.ResourceInfo, p *registry.ProviderInfo, ind *bool, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

if utils.IsAbsoluteReference(req.Ref) {
resPath := path.Clean(req.Ref.GetPath())
if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) {
// The path which we're supposed to list encompasses this provider
// so just return the first child and mark it as indirect
rel, err := filepath.Rel(resPath, p.ProviderPath)
if err != nil {
*e = err
return
}
parts := strings.Split(rel, "/")
p := path.Join(resPath, parts[0])
*ind = true
*res = []*provider.ResourceInfo{
{
Id: &provider.ResourceId{
StorageId: "/",
OpaqueId: uuid.New().String(),
},
Type: provider.ResourceType_RESOURCE_TYPE_CONTAINER,
Path: p,
Size: 0,
},
}
return
}
}

r, err := c.ListContainer(ctx, req)
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
return
}
*res = r.Infos
}

func (s *svc) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
log := appctx.GetLogger(ctx)

Expand Down Expand Up @@ -2197,6 +2145,18 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
return res.Providers, nil
}

func getUniqueProviders(providers []*registry.ProviderInfo) []*registry.ProviderInfo {
unique := make(map[string]bool)
for _, p := range providers {
unique[p.Address] = true
}
p := make([]*registry.ProviderInfo, 0, len(unique))
for addr := range unique {
p = append(p, &registry.ProviderInfo{Address: addr})
}
return p
}

type etagWithTS struct {
Etag string
Timestamp time.Time
Expand Down
Loading

0 comments on commit 80b2065

Please sign in to comment.