Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: introduce LookupCtx #3043

Merged
merged 5 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/lookupctx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: introduce LookupCtx for index interface

The index interface now has a new LookupCtx that can look up multiple values so we can more efficiently look up multiple shares by id.
It also takes a context so we can pass on the trace context to the CS3 backend

https://github.com/cs3org/reva/pull/3043
32 changes: 17 additions & 15 deletions internal/http/services/owncloud/ocdav/propfind/propfind.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,26 +366,28 @@ func (p *Handler) propfindResponse(ctx context.Context, w http.ResponseWriter, r
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(ctx, "propfind_response")
defer span.End()

filters := make([]*link.ListPublicSharesRequest_Filter, 0, len(resourceInfos))
for i := range resourceInfos {
// the list of filters grows with every public link in a folder
filters = append(filters, publicshare.ResourceIDFilter(resourceInfos[i].Id))
}

client, err := p.getClient()
if err != nil {
log.Error().Err(err).Msg("error getting grpc client")
w.WriteHeader(http.StatusInternalServerError)
return
}

var linkshares map[string]struct{}
// public link access does not show share-types
// oc:share-type is not part of an allprops response
if namespace != "/public" {
// only fetch this if property was queried
for _, p := range pf.Prop {
if p.Space == net.NsOwncloud && (p.Local == "share-types" || p.Local == "permissions") {
for _, prop := range pf.Prop {
if prop.Space == net.NsOwncloud && (prop.Local == "share-types" || prop.Local == "permissions") {
filters := make([]*link.ListPublicSharesRequest_Filter, 0, len(resourceInfos))
for i := range resourceInfos {
// FIXME this is expensive
// the filters array grow by one for every file in a folder
// TODO store public links as grants on the storage, reassembling them here is too costly
// we can then add the filter if the file has share-types=3 in the opaque,
// same as user / group shares for share indicators
filters = append(filters, publicshare.ResourceIDFilter(resourceInfos[i].Id))
}
client, err := p.getClient()
if err != nil {
log.Error().Err(err).Msg("error getting grpc client")
w.WriteHeader(http.StatusInternalServerError)
return
}
listResp, err := client.ListPublicShares(ctx, &link.ListPublicSharesRequest{Filters: filters})
if err == nil {
linkshares = make(map[string]struct{}, len(listResp.Share))
Expand Down
36 changes: 18 additions & 18 deletions pkg/publicshare/manager/cs3/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,33 +397,37 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
return result, nil
}

tokensByResourceID := make(map[string]*provider.ResourceId)
for _, filter := range idFilter {
resourceID := filter.GetResourceId()
tokens, err := m.indexer.FindBy(&link.PublicShare{},
indexer.NewField("ResourceId", resourceIDToIndex(resourceID)),
)
if err != nil {
continue
var tokens []string
if len(idFilter) > 0 {
idFilters := make([]indexer.Field, 0, len(idFilter))
for _, filter := range idFilter {
resourceID := filter.GetResourceId()
idFilters = append(idFilters, indexer.NewField("ResourceId", resourceIDToIndex(resourceID)))
}
for _, token := range tokens {
tokensByResourceID[token] = resourceID
tokens, err = m.indexer.FindBy(&link.PublicShare{}, idFilters...)
if err != nil {
return nil, err
}
}

// statMem is used as a local cache to prevent statting resources which
// already have been checked.
statMem := make(map[string]struct{})
for token, resourceID := range tokensByResourceID {
for _, token := range tokens {
if _, handled := shareMem[token]; handled {
// We don't want to add a share multiple times when we added it
// already.
continue
}

if _, checked := statMem[resourceIDToIndex(resourceID)]; !checked {
s, err := m.getByToken(ctx, token)
if err != nil {
return nil, err
}

if _, checked := statMem[resourceIDToIndex(s.PublicShare.GetResourceId())]; !checked {
sReq := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: resourceID},
Ref: &provider.Reference{ResourceId: s.PublicShare.GetResourceId()},
}
sRes, err := m.gatewayClient.Stat(ctx, sReq)
if err != nil {
Expand All @@ -435,13 +439,9 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
if !sRes.Info.PermissionSet.ListGrants {
continue
}
statMem[resourceIDToIndex(resourceID)] = struct{}{}
statMem[resourceIDToIndex(s.PublicShare.GetResourceId())] = struct{}{}
}

s, err := m.getByToken(ctx, token)
if err != nil {
return nil, err
}
if publicshare.MatchesFilters(s.PublicShare, filters) {
result = append(result, &s.PublicShare)
shareMem[s.PublicShare.Token] = struct{}{}
Expand Down
53 changes: 46 additions & 7 deletions pkg/storage/utils/indexer/index/autoincrement.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,56 @@ func (idx *Autoincrement) Init() error {

// Lookup exact lookup by value.
func (idx *Autoincrement) Lookup(v string) ([]string, error) {
searchPath := path.Join(idx.indexRootDir, v)
oldname, err := idx.storage.ResolveSymlink(context.Background(), searchPath)
if err != nil {
if os.IsNotExist(err) {
err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
return idx.LookupCtx(context.Background(), v)
}

// LookupCtx retieves multiple exact values and allows passing in a context
func (idx *Autoincrement) LookupCtx(ctx context.Context, values ...string) ([]string, error) {
var allValues map[string]struct{}
if len(values) != 1 {
// prefetch all values with one request
entries, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir))
if err != nil {
return nil, err
}
// convert known values to set
allValues = make(map[string]struct{}, len(entries))
for _, e := range entries {
allValues[path.Base(e)] = struct{}{}
}
}

return nil, err
// convert requested values to set
valueSet := make(map[string]struct{}, len(values))
for _, v := range values {
valueSet[v] = struct{}{}
}

return []string{oldname}, nil
var matches = []string{}
for v := range valueSet {
if _, ok := allValues[v]; ok || len(allValues) == 0 {
oldname, err := idx.storage.ResolveSymlink(context.Background(), path.Join("/", idx.indexRootDir, v))
if err != nil {
continue
}
matches = append(matches, oldname)
}
}

if len(matches) == 0 {
var v string
switch len(values) {
case 0:
v = "none"
case 1:
v = values[0]
default:
v = "multiple"
}
return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
}

return matches, nil
}

// Add a new value to the index.
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/utils/indexer/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

package index

import "github.com/cs3org/reva/v2/pkg/storage/utils/indexer/option"
import (
"context"

"github.com/cs3org/reva/v2/pkg/storage/utils/indexer/option"
)

// Index can be implemented to create new indexer-strategies. See Unique for example.
// Each indexer implementation is bound to one data-column (IndexBy) and a data-type (TypeName)
type Index interface {
Init() error
Lookup(v string) ([]string, error)
LookupCtx(ctx context.Context, v ...string) ([]string, error)
Add(id, v string) (string, error)
Remove(id string, v string) error
Update(id, oldV, newV string) error
Expand Down
57 changes: 49 additions & 8 deletions pkg/storage/utils/indexer/index/non_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,65 @@ func (idx *NonUnique) Init() error {

// Lookup exact lookup by value.
func (idx *NonUnique) Lookup(v string) ([]string, error) {
if idx.caseInsensitive {
v = strings.ToLower(v)
}
paths, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir, v))
return idx.LookupCtx(context.Background(), v)
}

// LookupCtx retieves multiple exact values and allows passing in a context
func (idx *NonUnique) LookupCtx(ctx context.Context, values ...string) ([]string, error) {
// prefetch all values with one request
entries, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir))
if err != nil {
return nil, err
}
// convert known values to set
allValues := make(map[string]struct{}, len(entries))
for _, e := range entries {
allValues[path.Base(e)] = struct{}{}
}

var matches = make([]string, 0)
for _, p := range paths {
matches = append(matches, path.Base(p))
// convert requested values to set
valueSet := make(map[string]struct{}, len(values))
if idx.caseInsensitive {
for _, v := range values {
valueSet[strings.ToLower(v)] = struct{}{}
}
} else {
for _, v := range values {
valueSet[v] = struct{}{}
}
}

var matches = map[string]struct{}{}
for v := range valueSet {
if _, ok := allValues[v]; ok {
children, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir, v))
if err != nil {
continue
}
for _, c := range children {
matches[path.Base(c)] = struct{}{}
}
}
}

if len(matches) == 0 {
var v string
switch len(values) {
case 0:
v = "none"
case 1:
v = values[0]
default:
v = "multiple"
}
return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
}

return matches, nil
ret := make([]string, 0, len(matches))
for m := range matches {
ret = append(ret, m)
}
return ret, nil
}

// Add a new value to the index.
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/utils/indexer/index/non_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func TestNonUniqueIndexAdd(t *testing.T) {

ids, err := sut.Lookup("Green")
assert.NoError(t, err)
assert.EqualValues(t, []string{"goefe-789", "xadaf-189"}, ids)
assert.Len(t, ids, 2)
assert.Contains(t, ids, "goefe-789")
assert.Contains(t, ids, "xadaf-189")

ids, err = sut.Lookup("White")
assert.NoError(t, err)
Expand Down
58 changes: 50 additions & 8 deletions pkg/storage/utils/indexer/index/unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,62 @@ func (idx *Unique) Init() error {

// Lookup exact lookup by value.
func (idx *Unique) Lookup(v string) ([]string, error) {
return idx.LookupCtx(context.Background(), v)
}

// LookupCtx retieves multiple exact values and allows passing in a context
func (idx *Unique) LookupCtx(ctx context.Context, values ...string) ([]string, error) {
var allValues map[string]struct{}
if len(values) != 1 {
// prefetch all values with one request
entries, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir))
if err != nil {
return nil, err
}
// convert known values to set
allValues = make(map[string]struct{}, len(entries))
for _, e := range entries {
allValues[path.Base(e)] = struct{}{}
}
}

// convert requested values to set
valueSet := make(map[string]struct{}, len(values))
if idx.caseInsensitive {
v = strings.ToLower(v)
for _, v := range values {
valueSet[strings.ToLower(v)] = struct{}{}
}
} else {
for _, v := range values {
valueSet[v] = struct{}{}
}
}
searchPath := path.Join(idx.indexRootDir, v)
oldname, err := idx.storage.ResolveSymlink(context.Background(), searchPath)
if err != nil {
if os.IsNotExist(err) {
err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}

var matches = make([]string, 0)
for v := range valueSet {
if _, ok := allValues[v]; ok || len(allValues) == 0 {
oldname, err := idx.storage.ResolveSymlink(context.Background(), path.Join(idx.indexRootDir, v))
if err != nil {
continue
}
matches = append(matches, oldname)
}
}

return nil, err
if len(matches) == 0 {
var v string
switch len(values) {
case 0:
v = "none"
case 1:
v = values[0]
default:
v = "multiple"
}
return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
}

return []string{oldname}, nil
return matches, nil
}

// Add adds a value to the index, returns the path to the root-document
Expand Down
20 changes: 17 additions & 3 deletions pkg/storage/utils/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ func (i *StorageIndexer) FindBy(t interface{}, queryFields ...Field) ([]string,

resultPaths := make(map[string]struct{})
if fields, ok := i.indices[typeName]; ok {
for _, field := range queryFields {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field.Name)] {
res, err := idx.Lookup(field.Value)
for fieldName, queryFields := range groupFieldsByName(queryFields) {
idxes := fields.IndicesByField[strcase.ToCamel(fieldName)]
values := make([]string, 0, len(queryFields))
for _, f := range queryFields {
values = append(values, f.Value)
}
for _, idx := range idxes {
res, err := idx.LookupCtx(context.Background(), values...)
Comment on lines -169 to +176
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we no longer make one Lookup call per queryField but instead aggregate by field name and then use the new LookupCtx to fetch all results at once.

I introduced the context because we want to be able to pass the trace context into the index as well ...

if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
continue
Expand All @@ -193,6 +198,15 @@ func (i *StorageIndexer) FindBy(t interface{}, queryFields ...Field) ([]string,
return result, nil
}

// groupFieldsByName groups the given filters and returns a map using the filter type as the key.
func groupFieldsByName(queryFields []Field) map[string][]Field {
grouped := make(map[string][]Field)
for _, f := range queryFields {
grouped[f.Name] = append(grouped[f.Name], f)
}
return grouped
}

// Delete deletes all indexed fields of a given type t on the Indexer.
func (i *StorageIndexer) Delete(t interface{}) error {
typeName := getTypeFQN(t)
Expand Down