Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add trace details to jsoncs3 #3960

Merged
merged 1 commit into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/jsoncs3-trace-span-details.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: add trace span details

https://github.com/cs3org/reva/pull/3960
74 changes: 61 additions & 13 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

Expand Down Expand Up @@ -230,46 +231,61 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int,
}, nil
}

func (m *Manager) initialize() error {
func (m *Manager) initialize(ctx context.Context) error {
_, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "initialize")
defer span.End()
if m.initialized {
span.SetStatus(codes.Ok, "already initialized")
return nil
}

m.Lock()
defer m.Unlock()

if m.initialized { // check if initialization happened while grabbing the lock
span.SetStatus(codes.Ok, "initialized while grabbing lock")
return nil
}

ctx := context.Background()
ctx = context.Background()
err := m.storage.Init(ctx, "jsoncs3-share-manager-metadata")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

err = m.storage.MakeDirIfNotExist(ctx, "storages")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "users")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "groups")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

m.initialized = true
span.SetStatus(codes.Ok, "initialized")
return nil
}

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

Expand All @@ -280,7 +296,10 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
// TODO: should this not already be caught at the gw level?
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER &&
(utils.UserEqual(g.Grantee.GetUserId(), user.Id) || utils.UserEqual(g.Grantee.GetUserId(), md.Owner)) {
return nil, errtypes.BadRequest("jsoncs3: owner/creator and grantee are the same")
err := errtypes.BadRequest("jsoncs3: owner/creator and grantee are the same")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

// check if share already exists.
Expand All @@ -295,7 +314,10 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
_, err := m.getByKey(ctx, key)
if err == nil {
// share already exists
return nil, errtypes.AlreadyExists(key.String())
err := errtypes.AlreadyExists(key.String())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

shareID := shareid.Encode(md.GetId().GetStorageId(), md.GetId().GetSpaceId(), uuid.NewString())
Expand All @@ -316,24 +338,32 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

Expand All @@ -350,29 +380,38 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
groupid := g.Grantee.GetGroupId().GetOpaqueId()
err := m.GroupReceivedCache.Add(ctx, groupid, shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.GroupReceivedCache.Add(ctx, groupid, shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
}

span.SetStatus(codes.Ok, "")
return s, nil
}

Expand Down Expand Up @@ -425,7 +464,7 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -478,7 +517,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return err
}

Expand All @@ -504,7 +543,7 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -586,7 +625,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -622,6 +661,8 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
for spaceID := range spaces {
err := m.Cache.Sync(ctx, providerID, spaceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

Expand Down Expand Up @@ -669,6 +710,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}
}
}
span.SetStatus(codes.Ok, "")
return ss, nil
}

Expand All @@ -679,6 +721,8 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
var ss []*collaboration.Share

if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return ss, err
}
for ssid, spaceShareIDs := range m.CreatedCache.List(user.Id.OpaqueId) {
Expand Down Expand Up @@ -718,6 +762,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
}
}

span.SetStatus(codes.Ok, "")
return ss, nil
}

Expand All @@ -726,7 +771,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -870,9 +915,12 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}

if err := g.Wait(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

span.SetStatus(codes.Ok, "")
return rss, nil
}

Expand All @@ -899,7 +947,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S

// GetReceivedShare returns the information for a received share.
func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -944,7 +992,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -997,7 +1045,7 @@ func updateShareID(share *collaboration.Share) {
// Load imports shares and received shares from channels (e.g. during migration)
func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error {
log := appctx.GetLogger(ctx)
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return err
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
span.SetStatus(codes.Ok, "no shares in provider or space")
return nil
}

Expand All @@ -178,11 +179,15 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
createdBytes, err := json.Marshal(c.Providers[storageID].Spaces[spaceID])
if err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := spaceJSONPath(storageID, spaceID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

Expand All @@ -192,8 +197,11 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
IfUnmodifiedSince: c.Providers[storageID].Spaces[spaceID].Mtime,
}); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
span.SetAttributes(attribute.String("cs3.userid", userID))

if c.ReceivedSpaces[userID] == nil {
span.SetStatus(codes.Ok, "no received shares")
return nil
}

Expand All @@ -188,11 +189,15 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
createdBytes, err := json.Marshal(c.ReceivedSpaces[userID])
if err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := userJSONPath(userID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

Expand All @@ -202,8 +207,11 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
IfUnmodifiedSince: c.ReceivedSpaces[userID].Mtime,
}); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/share/manager/jsoncs3/sharecache/sharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
createdBytes, err := json.Marshal(c.UserShares[userid])
if err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := c.userCreatedPath(userid)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

Expand All @@ -231,8 +235,11 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
IfUnmodifiedSince: c.UserShares[userid].Mtime,
}); err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down