Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jsoncs3 should use gateway selector and log from context #4608

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: use gateway selector in jsoncs3

The jsoncs3 user share manager now uses the gateway selector to get a fresh client before making requests and uses the configured logger from the context.

https://github.com/cs3org/reva/pull/4608
116 changes: 78 additions & 38 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ import (
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
Expand All @@ -52,6 +44,13 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

/*
Expand Down Expand Up @@ -153,8 +152,8 @@ type Manager struct {

MaxConcurrency int

gateway gatewayv1beta1.GatewayAPIClient
eventStream events.Stream
gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient]
eventStream events.Stream
}

// NewDefault returns a new manager instance with default dependencies
Expand All @@ -170,7 +169,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
return nil, err
}

gc, err := pool.GetGatewayServiceClient(c.GatewayAddr)
gatewaySelector, err := pool.GatewaySelector(c.GatewayAddr)
if err != nil {
return nil, err
}
Expand All @@ -183,19 +182,19 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
}
}

return New(s, gc, c.CacheTTL, es, c.MaxConcurrency)
return New(s, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency)
}

// New returns a new manager instance.
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{
Cache: providercache.New(s, ttl),
CreatedCache: sharecache.New(s, "users", "created.json", ttl),
UserReceivedStates: receivedsharecache.New(s, ttl),
GroupReceivedCache: sharecache.New(s, "groups", "received.json", ttl),
storage: s,
gateway: gc,
gatewaySelector: gatewaySelector,
eventStream: es,
MaxConcurrency: maxconcurrency,
}, nil
Expand Down Expand Up @@ -411,6 +410,7 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
defer span.End()
sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "GetShare").Logger()
if err := m.initialize(ctx); err != nil {
return nil, err
}
Expand All @@ -421,7 +421,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
Expand All @@ -432,7 +432,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to publish share expired event")
}
}
Expand All @@ -445,8 +445,15 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc

req := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: s.ResourceId},
FieldMask: &fieldmaskpb.FieldMask{
Paths: []string{"permissions"},
},
}
res, err := m.gateway.Stat(ctx, req)
client, err := m.gatewaySelector.Next()
if err != nil {
return nil, err
}
res, err := client.Stat(ctx, req)
if err == nil &&
res.Status.Code == rpcv1beta1.Code_CODE_OK &&
res.Info.PermissionSet.ListGrants {
Expand Down Expand Up @@ -523,8 +530,15 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
if !share.IsCreatedByUser(toUpdate, user) {
req := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: toUpdate.ResourceId},
FieldMask: &fieldmaskpb.FieldMask{
Paths: []string{"permissions"},
},
}
client, err := m.gatewaySelector.Next()
if err != nil {
return nil, err
}
res, err := m.gateway.Stat(ctx, req)
res, err := client.Stat(ctx, req)
if err != nil ||
res.Status.Code != rpcv1beta1.Code_CODE_OK ||
!res.Info.PermissionSet.UpdateGrant {
Expand Down Expand Up @@ -583,6 +597,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listSharesByIDs")
defer span.End()
sublog := appctx.GetLogger(ctx).With().Str("userid", user.GetId().GetOpaqueId()).Str("useridp", user.GetId().GetIdp()).Str("driver", "jsoncs3").Str("handler", "listSharesByIDs").Logger()

providerSpaces := make(map[string]map[string]struct{})
for _, f := range share.FilterFiltersByType(filters, collaboration.Filter_TYPE_RESOURCE_ID) {
Expand All @@ -604,19 +619,21 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}

for _, s := range shares.Shares {
resourceID := s.GetResourceId()
sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ItemID: resourceID,
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
Expand All @@ -626,17 +643,33 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}

if !(share.IsCreatedByUser(s, user) || share.IsGrantedToUser(s, user)) {
key := storagespace.FormatResourceID(*s.ResourceId)
key := storagespace.FormatResourceID(*resourceID)
if _, hit := statCache[key]; !hit {
req := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: s.ResourceId},
Ref: &provider.Reference{ResourceId: resourceID},
FieldMask: &fieldmaskpb.FieldMask{
Paths: []string{"permissions"},
},
}
client, err := m.gatewaySelector.Next()
if err != nil {
sublog.Error().Err(err).Msg("failed to select next gateway client")
continue
}
res, err := client.Stat(ctx, req)
if err != nil {
sublog.Error().Err(err).Msg("failed to make stat call")
continue
}
res, err := m.gateway.Stat(ctx, req)
if err != nil ||
res.Status.Code != rpcv1beta1.Code_CODE_OK ||
!res.Info.PermissionSet.ListGrants {
if res.Status.Code != rpcv1beta1.Code_CODE_OK {
sublog.Debug().Str("code", res.GetStatus().GetCode().String()).Msg(res.GetStatus().GetMessage())
continue
}
if !res.Info.PermissionSet.ListGrants {
sublog.Debug().Msg("user has no list grants permission")
continue
}
sublog.Debug().Msg("listing share for non participating user")
statCache[key] = struct{}{}
}
}
Expand All @@ -652,6 +685,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares")
defer span.End()
sublog := appctx.GetLogger(ctx).With().Str("userid", user.GetId().GetOpaqueId()).Str("useridp", user.GetId().GetIdp()).Str("driver", "jsoncs3").Str("handler", "listCreatedShares").Logger()

list, err := m.CreatedCache.List(ctx, user.Id.OpaqueId)
if err != nil {
Expand Down Expand Up @@ -694,7 +728,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
// fetch all shares from space with one request
_, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Str("storageid", storageID).
Str("spaceid", spaceID).
Msg("failed to list shares in space")
Expand All @@ -707,7 +741,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
Expand All @@ -717,7 +751,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
Expand Down Expand Up @@ -762,14 +796,19 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter, forUser *userv1beta1.UserId) ([]*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
defer span.End()
sublog := appctx.GetLogger(ctx).With().Str("driver", "jsoncs3").Str("handler", "ListReceivedShares").Logger()

if err := m.initialize(ctx); err != nil {
return nil, err
}

user := ctxpkg.ContextMustGetUser(ctx)
if user.GetId().GetType() == userv1beta1.UserType_USER_TYPE_SERVICE {
u, err := utils.GetUser(forUser, m.gateway)
client, err := m.gatewaySelector.Next()
if err != nil {
return nil, err
}
u, err := utils.GetUser(forUser, client)
if err != nil {
return nil, errtypes.BadRequest("user not found")
}
Expand Down Expand Up @@ -852,12 +891,11 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
sublogr := sublog.With().Str("storageid", storageID).Str("spaceid", spaceID).Logger()
// fetch all shares from space with one request
_, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).
Str("storageid", storageID).
Str("spaceid", spaceID).
sublogr.Error().Err(err).
Msg("failed to list shares in space")
continue
}
Expand All @@ -866,9 +904,10 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
if err != nil || s == nil {
continue
}
sublogr = sublogr.With().Str("shareid", shareID).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
sublogr.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
Expand All @@ -878,7 +917,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
sublogr.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
Expand Down Expand Up @@ -959,6 +998,7 @@ func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.Share
func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived")
defer span.End()
sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "getReceived").Logger()

s, err := m.get(ctx, ref)
if err != nil {
Expand All @@ -970,7 +1010,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
Expand All @@ -980,7 +1020,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
sublog.Error().Err(err).
Msg("failed to publish share expired event")
}
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/share/manager/jsoncs3/jsoncs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"sync"

gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
groupv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/conversions"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
sharespkg "github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache"
Expand All @@ -41,6 +43,7 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/cs3org/reva/v2/tests/cs3mocks/mocks"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/fieldmaskpb"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -153,8 +156,16 @@ var _ = Describe("Jsoncs3", func() {
storage, err = metadata.NewDiskStorage(tmpdir)
Expect(err).ToNot(HaveOccurred())

pool.RemoveSelector("GatewaySelector" + "com.owncloud.api.gateway")
client = &mocks.GatewayAPIClient{}
m, err = jsoncs3.New(storage, client, 0, nil, 0)
gatewaySelector := pool.GetSelector[gatewayv1beta1.GatewayAPIClient](
"GatewaySelector",
"com.owncloud.api.gateway",
func(cc *grpc.ClientConn) gatewayv1beta1.GatewayAPIClient {
return client
},
)
m, err = jsoncs3.New(storage, gatewaySelector, 0, nil, 0)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
Loading