From 9a7b644e220f66e01d6f84e3f7ef0b2d09cd71b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 2 Dec 2024 08:31:22 +0100 Subject: [PATCH 1/4] Fix gateway address in jsoncs3 config --- ocis/pkg/command/migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocis/pkg/command/migrate.go b/ocis/pkg/command/migrate.go index c30deee002b..aa845395e27 100644 --- a/ocis/pkg/command/migrate.go +++ b/ocis/pkg/command/migrate.go @@ -519,7 +519,7 @@ func revaShareConfig(cfg *sharing.Config) map[string]interface{} { "machine_auth_apikey": cfg.UserSharingDrivers.CS3.SystemUserAPIKey, }, "jsoncs3": map[string]interface{}{ - "gateway_addr": cfg.UserSharingDrivers.JSONCS3.ProviderAddr, + "gateway_addr": cfg.Reva.Address, "provider_addr": cfg.UserSharingDrivers.JSONCS3.ProviderAddr, "service_user_id": cfg.UserSharingDrivers.JSONCS3.SystemUserID, "service_user_idp": cfg.UserSharingDrivers.JSONCS3.SystemUserIDP, From 38e54515b02743dd7df5519cd64655fcac9faece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 2 Dec 2024 09:16:58 +0100 Subject: [PATCH 2/4] Add a "ocis shares cleanup" command --- ocis/pkg/command/shares.go | 122 +++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 ocis/pkg/command/shares.go diff --git a/ocis/pkg/command/shares.go b/ocis/pkg/command/shares.go new file mode 100644 index 00000000000..a6b1795b84d --- /dev/null +++ b/ocis/pkg/command/shares.go @@ -0,0 +1,122 @@ +package command + +import ( + "errors" + + "github.com/urfave/cli/v2" + + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3" + "github.com/cs3org/reva/v2/pkg/share/manager/registry" + "github.com/cs3org/reva/v2/pkg/utils" + + "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/config/parser" + mregistry "github.com/owncloud/ocis/v2/ocis-pkg/registry" + "github.com/owncloud/ocis/v2/ocis/pkg/register" + sharingparser "github.com/owncloud/ocis/v2/services/sharing/pkg/config/parser" +) + +// SharesCommand is the entrypoint for the groups command. +func SharesCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "shares", + Usage: `cli tools to manage entries in the share manager.`, + Category: "maintenance", + Before: func(c *cli.Context) error { + // Parse base config + if err := parser.ParseConfig(cfg, true); err != nil { + return configlog.ReturnError(err) + } + + // Parse sharing config + cfg.Sharing.Commons = cfg.Commons + return configlog.ReturnError(sharingparser.ParseConfig(cfg.Sharing)) + }, + Subcommands: []*cli.Command{ + cleanupCmd(cfg), + }, + } +} + +func init() { + register.AddCommand(SharesCommand) +} + +func cleanupCmd(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "cleanup", + Usage: `clean up stale entries in the share manager.`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "service-account-id", + Value: "", + Usage: "Name of the service account to use for the cleanup", + EnvVars: []string{"OCIS_SERVICE_ACCOUNT_ID"}, + Required: true, + }, + &cli.StringFlag{ + Name: "service-account-secret", + Value: "", + Usage: "Secret for the service account", + EnvVars: []string{"OCIS_SERVICE_ACCOUNT_SECRET"}, + Required: true, + }, + }, + Before: func(c *cli.Context) error { + // Parse base config + if err := parser.ParseConfig(cfg, true); err != nil { + return configlog.ReturnError(err) + } + + // Parse sharing config + cfg.Sharing.Commons = cfg.Commons + return configlog.ReturnError(sharingparser.ParseConfig(cfg.Sharing)) + }, + Action: func(c *cli.Context) error { + return cleanup(c, cfg) + }, + } +} + +func cleanup(c *cli.Context, cfg *config.Config) error { + driver := cfg.Sharing.UserSharingDriver + // cleanup is only implemented for the jsoncs3 share manager + if driver != "jsoncs3" { + return configlog.ReturnError(errors.New("cleanup is only implemented for the jsoncs3 share manager")) + } + + rcfg := revaShareConfig(cfg.Sharing) + f, ok := registry.NewFuncs[driver] + if !ok { + return configlog.ReturnError(errors.New("Unknown share manager type '" + driver + "'")) + } + mgr, err := f(rcfg[driver].(map[string]interface{})) + if err != nil { + return configlog.ReturnError(err) + } + + // Initialize registry to make service lookup work + _ = mregistry.GetRegistry() + + // get an authenticated context + gatewaySelector, err := pool.GatewaySelector(cfg.Sharing.Reva.Address) + if err != nil { + return configlog.ReturnError(err) + } + + client, err := gatewaySelector.Next() + if err != nil { + return configlog.ReturnError(err) + } + + serviceUserCtx, err := utils.GetServiceUserContext(c.String("service-account-id"), client, c.String("service-account-secret")) + if err != nil { + return configlog.ReturnError(err) + } + + mgr.(*jsoncs3.Manager).CleanupStaleShares(serviceUserCtx) + + return nil +} From 856a0794d7233477dc0841f3f209d93eca0a6dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 3 Dec 2024 10:09:55 +0100 Subject: [PATCH 3/4] Pass an initialized logger to reva --- ocis/pkg/command/shares.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ocis/pkg/command/shares.go b/ocis/pkg/command/shares.go index a6b1795b84d..20d46650135 100644 --- a/ocis/pkg/command/shares.go +++ b/ocis/pkg/command/shares.go @@ -3,6 +3,7 @@ package command import ( "errors" + "github.com/rs/zerolog" "github.com/urfave/cli/v2" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" @@ -116,6 +117,11 @@ func cleanup(c *cli.Context, cfg *config.Config) error { return configlog.ReturnError(err) } + l := logger() + + zerolog.SetGlobalLevel(zerolog.InfoLevel) + serviceUserCtx = l.WithContext(serviceUserCtx) + mgr.(*jsoncs3.Manager).CleanupStaleShares(serviceUserCtx) return nil From a925d09bbea5bfcd9a961a695f45e5d1fe662dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 3 Dec 2024 10:11:20 +0100 Subject: [PATCH 4/4] Bump reva --- go.mod | 2 +- go.sum | 2 + .../usershareprovider/usershareprovider.go | 49 ++++-- .../http/services/owncloud/ocdav/proppatch.go | 2 + .../v2/pkg/share/manager/jsoncs3/jsoncs3.go | 156 ++++++++++++++++-- .../jsoncs3/providercache/providercache.go | 60 +++++++ .../receivedsharecache/receivedsharecache.go | 68 ++++++++ .../manager/jsoncs3/sharecache/sharecache.go | 5 + .../reva/v2/pkg/storage/fs/ocis/ocis.go | 4 +- .../reva/v2/pkg/storage/fs/posix/posix.go | 2 +- .../reva/v2/pkg/storage/fs/posix/tree/tree.go | 2 + .../reva/v2/pkg/storage/fs/s3ng/s3ng.go | 4 +- .../utils/decomposedfs/decomposedfs.go | 14 +- .../pkg/storage/utils/decomposedfs/grants.go | 14 ++ .../storage/utils/decomposedfs/metadata.go | 4 + .../decomposedfs/mtimesyncedcache/map.go | 9 + .../storage/utils/decomposedfs/node/locks.go | 26 ++- .../storage/utils/decomposedfs/node/node.go | 7 +- .../storage/utils/decomposedfs/node/xattrs.go | 2 + .../permissions/spacepermissions.go | 2 + .../pkg/storage/utils/decomposedfs/recycle.go | 9 +- .../storage/utils/decomposedfs/revisions.go | 10 ++ .../decomposedfs/tree/propagator/async.go | 6 +- .../storage/utils/decomposedfs/tree/tree.go | 36 +++- .../pkg/storage/utils/decomposedfs/upload.go | 4 + .../utils/decomposedfs/upload/session.go | 4 + .../utils/decomposedfs/upload/store.go | 8 +- .../utils/decomposedfs/upload/upload.go | 8 +- .../reva/v2/pkg/storage/utils/metadata/cs3.go | 6 +- vendor/modules.txt | 2 +- 30 files changed, 468 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index b188fd5027a..5327044ea47 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.11.0 github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1 - github.com/cs3org/reva/v2 v2.26.7 + github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533 github.com/davidbyttow/govips/v2 v2.15.0 github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e diff --git a/go.sum b/go.sum index 09281add430..94d12b1bbf3 100644 --- a/go.sum +++ b/go.sum @@ -257,6 +257,8 @@ github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1 h1:RU6LT6mkD16xZ github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1/go.mod h1:DedpcqXl193qF/08Y04IO0PpxyyMu8+GrkD6kWK2MEQ= github.com/cs3org/reva/v2 v2.26.7 h1:E5b1+H5ZsnmDgWWS/u3t4PtdmiMaY1bEEYVI/vE9xo8= github.com/cs3org/reva/v2 v2.26.7/go.mod h1:xC5N2XOrCRim/W55uyMsew8RwwFZbQ4hIaKshIbyToo= +github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533 h1:QshDjljk44ASolJwlHxE9e7u+Slgdi/VfPKYvbfFu2g= +github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533/go.mod h1:fJWmn7EkttWOWphZfiKdFOcHuthcUsU55aSN1VeTOhU= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/usershareprovider/usershareprovider.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/usershareprovider/usershareprovider.go index f84736abac2..803070412d4 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/usershareprovider/usershareprovider.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/usershareprovider/usershareprovider.go @@ -520,12 +520,7 @@ func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.Up isMountPointSet := slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathMountPoint) && req.GetShare().GetMountPoint().GetPath() != "" // we calculate a valid mountpoint only if the share should be accepted and the mount point is not set explicitly if isStateTransitionShareAccepted && !isMountPointSet { - gatewayClient, err := s.gatewaySelector.Next() - if err != nil { - return nil, err - } - - s, err := setReceivedShareMountPoint(ctx, gatewayClient, req) + s, err := s.setReceivedShareMountPoint(ctx, req) switch { case err != nil: fallthrough @@ -556,7 +551,11 @@ func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.Up } } -func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClient, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) { +func (s *service) setReceivedShareMountPoint(ctx context.Context, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) { + gwc, err := s.gatewaySelector.Next() + if err != nil { + return nil, err + } receivedShare, err := gwc.GetReceivedShare(ctx, &collaboration.GetReceivedShareRequest{ Ref: &collaboration.ShareReference{ Spec: &collaboration.ShareReference_Id{ @@ -575,6 +574,10 @@ func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClien return status.NewOK(ctx), nil } + gwc, err = s.gatewaySelector.Next() + if err != nil { + return nil, err + } resourceStat, err := gwc.Stat(ctx, &provider.StatRequest{ Ref: &provider.Reference{ ResourceId: receivedShare.GetShare().GetShare().GetResourceId(), @@ -592,11 +595,15 @@ func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClien var userID *userpb.UserId _ = utils.ReadJSONFromOpaque(req.Opaque, "userid", &userID) + receivedShares, err := s.sm.ListReceivedShares(ctx, []*collaboration.Filter{}, userID) + if err != nil { + return nil, err + } + // check if the requested mount point is available and if not, find a suitable one - availableMountpoint, _, err := GetMountpointAndUnmountedShares(ctx, gwc, + availableMountpoint, _, err := getMountpointAndUnmountedShares(ctx, receivedShares, s.gatewaySelector, nil, resourceStat.GetInfo().GetId(), resourceStat.GetInfo().GetName(), - userID, ) if err != nil { return status.NewInternal(ctx, err.Error()), nil @@ -620,7 +627,6 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI if userId != nil { listReceivedSharesReq.Opaque = utils.AppendJSONToOpaque(nil, "userid", userId) } - listReceivedSharesRes, err := gwc.ListReceivedShares(ctx, listReceivedSharesReq) if err != nil { return "", nil, errtypes.InternalError("grpc list received shares request failed") @@ -630,17 +636,30 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI return "", nil, err } + return getMountpointAndUnmountedShares(ctx, listReceivedSharesRes.GetShares(), nil, gwc, id, name) +} + +// GetMountpointAndUnmountedShares returns a new or existing mountpoint for the given info and produces a list of unmounted received shares for the same resource +func getMountpointAndUnmountedShares(ctx context.Context, receivedShares []*collaboration.ReceivedShare, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], gwc gateway.GatewayAPIClient, id *provider.ResourceId, name string) (string, []*collaboration.ReceivedShare, error) { + unmountedShares := []*collaboration.ReceivedShare{} base := filepath.Clean(name) mount := base existingMountpoint := "" - mountedShares := make([]string, 0, len(listReceivedSharesRes.GetShares())) + mountedShares := make([]string, 0, len(receivedShares)) var pathExists bool + var err error - for _, s := range listReceivedSharesRes.GetShares() { + for _, s := range receivedShares { resourceIDEqual := utils.ResourceIDEqual(s.GetShare().GetResourceId(), id) if resourceIDEqual && s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED { + if gatewaySelector != nil { + gwc, err = gatewaySelector.Next() + if err != nil { + return "", nil, err + } + } // a share to the resource already exists and is mounted, remembers the mount point _, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc) if err == nil { @@ -658,6 +677,12 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI mountedShares = append(mountedShares, s.GetMountPoint().GetPath()) if s.GetMountPoint().GetPath() == mount { // does the shared resource still exist? + if gatewaySelector != nil { + gwc, err = gatewaySelector.Next() + if err != nil { + return "", nil, err + } + } _, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc) if err == nil { pathExists = true diff --git a/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/proppatch.go b/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/proppatch.go index 6ead723f5eb..b6fcf1f97d3 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/proppatch.go +++ b/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/proppatch.go @@ -133,12 +133,14 @@ func (s *svc) handleProppatch(ctx context.Context, w http.ResponseWriter, r *htt rreq := &provider.UnsetArbitraryMetadataRequest{ Ref: ref, ArbitraryMetadataKeys: []string{""}, + LockId: requestLockToken(r), } sreq := &provider.SetArbitraryMetadataRequest{ Ref: ref, ArbitraryMetadata: &provider.ArbitraryMetadata{ Metadata: map[string]string{}, }, + LockId: requestLockToken(r), } acceptedProps := []xml.Name{} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go index e8e00dd4104..6e67ae2b2a7 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go @@ -34,6 +34,7 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/share" "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache" @@ -114,6 +115,12 @@ func init() { registry.Register("jsoncs3", NewDefault) } +var ( + _registeredEvents = []events.Unmarshaller{ + events.SpaceDeleted{}, + } +) + type config struct { GatewayAddr string `mapstructure:"gateway_addr"` MaxConcurrency int `mapstructure:"max_concurrency"` @@ -188,7 +195,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { // New returns a new manager instance. func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { ttl := time.Duration(ttlSeconds) * time.Second - return &Manager{ + + m := &Manager{ Cache: providercache.New(s, ttl), CreatedCache: sharecache.New(s, "users", "created.json", ttl), UserReceivedStates: receivedsharecache.New(s, ttl), @@ -197,7 +205,18 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate gatewaySelector: gatewaySelector, eventStream: es, MaxConcurrency: maxconcurrency, - }, nil + } + + // listen for events + if m.eventStream != nil { + ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...) + if err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events") + } + go m.ProcessEvents(ch) + } + + return m, nil } func (m *Manager) initialize(ctx context.Context) error { @@ -248,6 +267,22 @@ func (m *Manager) initialize(ctx context.Context) error { return nil } +func (m *Manager) ProcessEvents(ch <-chan events.Event) { + log := logger.New() + for event := range ch { + ctx := context.Background() + + if err := m.initialize(ctx); err != nil { + log.Error().Err(err).Msg("error initializing manager") + } + + if ev, ok := event.Event.(events.SpaceDeleted); ok { + log.Debug().Msgf("space deleted event: %v", ev) + go func() { m.purgeSpace(ctx, ev.ID) }() + } + } +} + // Share creates a new share func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share") @@ -420,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc return nil, err } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -485,7 +520,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference return errtypes.NotFound(ref.String()) } - return m.removeShare(ctx, s) + return m.removeShare(ctx, s, false) } // UpdateShare updates the mode of the given share. @@ -622,7 +657,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f resourceID := s.GetResourceId() sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger() if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -740,7 +775,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, continue } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -901,12 +936,18 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati } for shareID, state := range w.rspace.States { s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true) - if err != nil || s == nil { + if err != nil { + sublogr.Error().Err(err).Msg("could not retrieve share") + continue + } + if s == nil { + sublogr.Warn().Str("shareid", shareID).Msg("share not found. cleaning up") + _ = m.UserReceivedStates.Remove(ctx, user.Id.OpaqueId, w.ssid, shareID) continue } sublogr = sublogr.With().Str("shareid", shareID).Logger() if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublogr.Error().Err(err). Msg("failed to unshare expired share") } @@ -1009,7 +1050,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer return nil, errtypes.NotFound(ref.String()) } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -1136,24 +1177,107 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar return nil } -func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error { +func (m *Manager) purgeSpace(ctx context.Context, id *provider.StorageSpaceId) { + log := appctx.GetLogger(ctx) + storageID, spaceID := storagespace.SplitStorageID(id.OpaqueId) + + shares, err := m.Cache.ListSpace(ctx, storageID, spaceID) + if err != nil { + log.Error().Err(err).Msg("error listing shares in space") + return + } + + // iterate over all shares in the space and remove them + for _, share := range shares.Shares { + err := m.removeShare(ctx, share, true) + if err != nil { + log.Error().Err(err).Msg("error removing share") + } + } + + // remove all shares in the space + err = m.Cache.PurgeSpace(ctx, storageID, spaceID) + if err != nil { + log.Error().Err(err).Msg("error purging space") + } +} + +func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipSpaceCache bool) error { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare") defer span.End() eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) - err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) + if !skipSpaceCache { + eg.Go(func() error { + storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) + err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) - return err - }) + return err + }) + } eg.Go(func() error { // remove from created cache return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) }) - // TODO remove from grantee cache + eg.Go(func() error { + // remove from user received states + if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + return m.UserReceivedStates.Remove(ctx, s.GetGrantee().GetUserId().GetOpaqueId(), s.GetResourceId().GetStorageId()+shareid.IDDelimiter+s.GetResourceId().GetSpaceId(), s.Id.OpaqueId) + } else if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + return m.GroupReceivedCache.Remove(ctx, s.GetGrantee().GetGroupId().GetOpaqueId(), s.Id.OpaqueId) + } + return nil + }) return eg.Wait() } + +func (m *Manager) CleanupStaleShares(ctx context.Context) { + log := appctx.GetLogger(ctx) + + if err := m.initialize(ctx); err != nil { + return + } + + // list all shares + providers, err := m.Cache.All(ctx) + if err != nil { + log.Error().Err(err).Msg("error listing all shares") + return + } + + client, err := m.gatewaySelector.Next() + if err != nil { + log.Error().Err(err).Msg("could not get gateway client") + } + + providers.Range(func(storage string, spaces *providercache.Spaces) bool { + log.Info().Str("storage", storage).Interface("spaceCount", spaces.Spaces.Count()).Msg("checking storage") + + spaces.Spaces.Range(func(space string, shares *providercache.Shares) bool { + log.Info().Str("storage", storage).Str("space", space).Interface("shareCount", len(shares.Shares)).Msg("checking space") + + for _, s := range shares.Shares { + req := &provider.StatRequest{ + Ref: &provider.Reference{ResourceId: s.ResourceId, Path: "."}, + } + res, err := client.Stat(ctx, req) + if err != nil { + log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not stat shared resource") + } + if res.Status.Code == rpcv1beta1.Code_CODE_NOT_FOUND { + log.Info().Str("storage", storage).Str("space", space).Msg("shared resource does not exist anymore. cleaning up shares") + if err := m.removeShare(ctx, s, false); err != nil { + log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not remove share") + } + } + } + + return true + }) + + return true + }) +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go index 5286be783db..3631b96c569 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -25,6 +25,7 @@ import ( "os" "path" "path/filepath" + "strings" "sync" "time" @@ -319,6 +320,36 @@ func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string, ski return space.Shares[shareID], nil } +// All returns all entries in the storage +func (c *Cache) All(ctx context.Context) (*mtimesyncedcache.Map[string, *Spaces], error) { + ctx, span := tracer.Start(ctx, "All") + defer span.End() + + providers, err := c.storage.ListDir(ctx, "/storages") + if err != nil { + return nil, err + } + for _, provider := range providers { + storageID := provider.Name + spaces, err := c.storage.ListDir(ctx, path.Join("/storages", storageID)) + if err != nil { + return nil, err + } + for _, space := range spaces { + spaceID := strings.TrimSuffix(space.Name, ".json") + + unlock := c.LockSpace(spaceID) + span.AddEvent("got lock for space " + spaceID) + if err := c.syncWithLock(ctx, storageID, spaceID); err != nil { + return nil, err + } + unlock() + } + } + + return &c.Providers, nil +} + // ListSpace returns the list of shares in a given space func (c *Cache) ListSpace(ctx context.Context, storageID, spaceID string) (*Shares, error) { ctx, span := tracer.Start(ctx, "ListSpace") @@ -418,6 +449,35 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error { return nil } +// PurgeSpace removes a space from the cache +func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error { + ctx, span := tracer.Start(ctx, "PurgeSpace") + defer span.End() + + unlock := c.LockSpace(spaceID) + defer unlock() + span.AddEvent("got lock") + + if !c.isSpaceCached(storageID, spaceID) { + err := c.syncWithLock(ctx, storageID, spaceID) + if err != nil { + return err + } + } + + spaces, ok := c.Providers.Load(storageID) + if !ok { + return nil + } + newShares := &Shares{} + if space, ok := spaces.Spaces.Load(spaceID); ok { + newShares.Etag = space.Etag // keep the etag to allow overwriting the state on the server + } + spaces.Spaces.Store(spaceID, newShares) + + return c.Persist(ctx, storageID, spaceID) +} + func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error { ctx, span := tracer.Start(ctx, "syncWithLock") defer span.End() diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go index 8bb6bb6ad04..cc32bcbff55 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go @@ -185,6 +185,74 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat return rss.Spaces[spaceID].States[shareID], nil } +// Remove removes an entry from the cache +func (c *Cache) Remove(ctx context.Context, userID, spaceID, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock") + unlock := c.lockUser(userID) + span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + defer unlock() + + ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID)) + + persistFunc := func() error { + c.initializeIfNeeded(userID, spaceID) + + rss, _ := c.ReceivedSpaces.Load(userID) + receivedSpace := rss.Spaces[spaceID] + if receivedSpace.States == nil { + receivedSpace.States = map[string]*State{} + } + delete(receivedSpace.States, shareID) + if len(receivedSpace.States) == 0 { + delete(rss.Spaces, spaceID) + } + + return c.persist(ctx, userID) + } + + log := appctx.GetLogger(ctx).With(). + Str("hostname", os.Getenv("HOSTNAME")). + Str("userID", userID). + Str("spaceID", spaceID).Logger() + + var err error + for retries := 100; retries > 0; retries-- { + err = persistFunc() + switch err.(type) { + case nil: + span.SetStatus(codes.Ok, "") + return nil + case errtypes.Aborted: + log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...") + // this is the expected status code from the server when the if-match etag check fails + // continue with sync below + case errtypes.PreconditionFailed: + log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...") + // actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side + // continue with sync below + case errtypes.AlreadyExists: + log.Debug().Msg("already exists when persisting added received share. retrying...") + // CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call. + // Thas happens when the cache thinks there is no file. + // continue with sync below + default: + span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error())) + log.Error().Err(err).Msg("persisting added received share failed") + return err + } + if err := c.syncWithLock(ctx, userID); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.Error().Err(err).Msg("persisting added received share failed. giving up.") + return err + } + } + return err +} + // List returns a list of received shares for a given user // The return list is guaranteed to be thread-safe func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, error) { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go index f763dee0f85..518344f95ef 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -214,6 +214,11 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { log.Debug().Msg("precondition failed when persisting removed share: etag changed. retrying...") // actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side // continue with sync below + case errtypes.AlreadyExists: + log.Debug().Msg("file already existed when persisting removed share. retrying...") + // CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call. + // Thas happens when the cache thinks there is no file. + // continue with sync below default: span.SetStatus(codes.Error, fmt.Sprintf("persisting removed share failed. giving up: %s", err.Error())) log.Error().Err(err).Msg("persisting removed share failed") diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/ocis/ocis.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/ocis/ocis.go index f8ec0a85f3a..32d2766b581 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/ocis/ocis.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/ocis/ocis.go @@ -36,7 +36,7 @@ func init() { // New returns an implementation to of the storage.FS interface that talk to // a local filesystem. -func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (storage.FS, error) { +func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (storage.FS, error) { o, err := options.New(m) if err != nil { return nil, err @@ -47,5 +47,5 @@ func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (sto return nil, err } - return decomposedfs.NewDefault(m, bs, stream) + return decomposedfs.NewDefault(m, bs, stream, log) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/posix.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/posix.go index e6faca47b19..e5387602cd7 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/posix.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/posix.go @@ -134,7 +134,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s Trashbin: trashbin, } - dfs, err := decomposedfs.New(&o.Options, aspects) + dfs, err := decomposedfs.New(&o.Options, aspects, log) if err != nil { return nil, err } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree/tree.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree/tree.go index cbe0f392e29..18228ff548d 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree/tree.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree/tree.go @@ -704,6 +704,8 @@ func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string, // InitNewNode initializes a new node func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) { + _, span := tracer.Start(ctx, "InitNewNode") + defer span.End() // create folder structure (if needed) if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { return nil, err diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/s3ng.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/s3ng.go index d261ad0ea59..eb755127be8 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/s3ng.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/s3ng.go @@ -35,7 +35,7 @@ func init() { // New returns an implementation to of the storage.FS interface that talk to // a local filesystem. -func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (storage.FS, error) { +func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (storage.FS, error) { o, err := parseConfig(m) if err != nil { return nil, err @@ -59,5 +59,5 @@ func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (sto return nil, err } - return decomposedfs.NewDefault(m, bs, stream) + return decomposedfs.NewDefault(m, bs, stream, log) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go index 8b956408897..b1abd267f9a 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -35,6 +35,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/jellydator/ttlcache/v2" "github.com/pkg/errors" + "github.com/rs/zerolog" tusd "github.com/tus/tusd/v2/pkg/handler" microstore "go-micro.dev/v4/store" "go.opentelemetry.io/otel" @@ -125,10 +126,12 @@ type Decomposedfs struct { userSpaceIndex *spaceidindex.Index groupSpaceIndex *spaceidindex.Index spaceTypeIndex *spaceidindex.Index + + log *zerolog.Logger } // NewDefault returns an instance with default components -func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (storage.FS, error) { +func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, log *zerolog.Logger) (storage.FS, error) { o, err := options.New(m) if err != nil { return nil, err @@ -169,14 +172,12 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) ( Trashbin: &DecomposedfsTrashbin{}, } - return New(o, aspects) + return New(o, aspects, log) } // New returns an implementation of the storage.FS interface that talks to // a local filesystem. -func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) { - log := logger.New() - +func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (storage.FS, error) { err := aspects.Tree.Setup() if err != nil { log.Error().Err(err).Msg("could not setup tree") @@ -235,6 +236,7 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) { userSpaceIndex: userSpaceIndex, groupSpaceIndex: groupSpaceIndex, spaceTypeIndex: spaceTypeIndex, + log: log, } fs.sessionStore = upload.NewSessionStore(fs, aspects, o.Root, o.AsyncFileUploads, o.Tokens) if err = fs.trashbin.Setup(fs); err != nil { @@ -311,7 +313,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { keepUpload = true metrics.UploadSessionsAborted.Inc() case events.PPOutcomeContinue: - if err := session.Finalize(); err != nil { + if err := session.Finalize(ctx); err != nil { sublog.Error().Err(err).Msg("could not finalize upload") failed = true revertNodeMetadata = false diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/grants.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/grants.go index 167fb6a5d6c..0fd2ccf0387 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/grants.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/grants.go @@ -38,6 +38,8 @@ import ( // DenyGrant denies access to a resource. func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference, grantee *provider.Grantee) error { + _, span := tracer.Start(ctx, "DenyGrant") + defer span.End() log := appctx.GetLogger(ctx) log.Debug().Interface("ref", ref).Interface("grantee", grantee).Msg("DenyGrant()") @@ -74,6 +76,8 @@ func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference, // AddGrant adds a grant to a resource func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + _, span := tracer.Start(ctx, "AddGrant") + defer span.End() log := appctx.GetLogger(ctx) log.Debug().Interface("ref", ref).Interface("grant", g).Msg("AddGrant()") grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g) @@ -119,6 +123,8 @@ func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g // ListGrants lists the grants on the specified resource func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) (grants []*provider.Grant, err error) { + _, span := tracer.Start(ctx, "ListGrants") + defer span.End() var grantNode *node.Node if grantNode, err = fs.lu.NodeFromResource(ctx, ref); err != nil { return @@ -174,6 +180,8 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) // RemoveGrant removes a grant from resource func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + _, span := tracer.Start(ctx, "RemoveGrant") + defer span.End() grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g) if err != nil { return err @@ -235,6 +243,8 @@ func isShareGrant(ctx context.Context) bool { // UpdateGrant updates a grant on a resource // TODO remove AddGrant or UpdateGrant grant from CS3 api, redundant? tracked in https://github.com/cs3org/cs3apis/issues/92 func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) error { + _, span := tracer.Start(ctx, "UpdateGrant") + defer span.End() log := appctx.GetLogger(ctx) log.Debug().Interface("ref", ref).Interface("grant", g).Msg("UpdateGrant()") @@ -272,6 +282,8 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference // checks if the given grant exists and returns it. Nil grant means it doesn't exist func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, metadata.UnlockFunc, *provider.Grant, error) { + _, span := tracer.Start(ctx, "loadGrant") + defer span.End() n, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { return nil, nil, nil, err @@ -308,6 +320,8 @@ func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, } func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provider.Grant) error { + _, span := tracer.Start(ctx, "storeGrant") + defer span.End() // if is a grant to a space root, the receiver needs the space type to update the indexes spaceType, ok := storageprovider.SpaceTypeFromContext(ctx) if !ok { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go index 41a44a75f1d..8d7cb73bc38 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go @@ -38,6 +38,8 @@ import ( // SetArbitraryMetadata sets the metadata on a resource func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) { + _, span := tracer.Start(ctx, "SetArbitraryMetadata") + defer span.End() n, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { return errors.Wrap(err, "Decomposedfs: error resolving ref") @@ -131,6 +133,8 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. // UnsetArbitraryMetadata unsets the metadata on the given resource func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) { + _, span := tracer.Start(ctx, "UnsetArbitraryMetadata") + defer span.End() n, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { return errors.Wrap(err, "Decomposedfs: error resolving ref") diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go index 830634fc23a..aa1d0cdd24a 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go @@ -34,3 +34,12 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) { } func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) } + +func (m *Map[K, V]) Count() int { + l := 0 + m.Range(func(_ K, _ V) bool { + l++ + return true + }) + return l +} diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/locks.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/locks.go index 3e0d3656dfb..88e05187c53 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/locks.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/locks.go @@ -38,6 +38,8 @@ import ( // SetLock sets a lock on the node func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error { + ctx, span := tracer.Start(ctx, "SetLock") + defer span.End() lockFilePath := n.LockFilePath() // ensure parent path exists @@ -89,22 +91,31 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error { // ReadLock reads the lock id for a node func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, error) { + ctx, span := tracer.Start(ctx, "ReadLock") + defer span.End() // ensure parent path exists - if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + _, subspan := tracer.Start(ctx, "os.MkdirAll") + err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700) + subspan.End() + if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error creating parent folder for lock") } // the caller of ReadLock already may hold a file lock if !skipFileLock { + _, subspan := tracer.Start(ctx, "filelocks.AcquireReadLock") fileLock, err := filelocks.AcquireReadLock(n.InternalPath()) + subspan.End() if err != nil { return nil, err } defer func() { + _, subspan := tracer.Start(ctx, "filelocks.ReleaseLock") rerr := filelocks.ReleaseLock(fileLock) + subspan.End() // if err is non nil we do not overwrite that if err == nil { @@ -113,7 +124,10 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, }() } + _, subspan = tracer.Start(ctx, "os.Open") f, err := os.Open(n.LockFilePath()) + subspan.End() + if err != nil { if errors.Is(err, fs.ErrNotExist) { return nil, errtypes.NotFound("no lock found") @@ -130,7 +144,11 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, // lock already expired if lock.Expiration != nil && time.Now().After(time.Unix(int64(lock.Expiration.Seconds), int64(lock.Expiration.Nanos))) { - if err = os.Remove(f.Name()); err != nil { + + _, subspan = tracer.Start(ctx, "os.Remove") + err = os.Remove(f.Name()) + subspan.End() + if err != nil { return nil, errors.Wrap(err, "Decomposedfs: could not remove expired lock file") } // we successfully deleted the expired lock @@ -142,6 +160,8 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, // RefreshLock refreshes the node's lock func (n *Node) RefreshLock(ctx context.Context, lock *provider.Lock, existingLockID string) error { + ctx, span := tracer.Start(ctx, "RefreshLock") + defer span.End() // ensure parent path exists if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { @@ -204,6 +224,8 @@ func (n *Node) RefreshLock(ctx context.Context, lock *provider.Lock, existingLoc // Unlock unlocks the node func (n *Node) Unlock(ctx context.Context, lock *provider.Lock) error { + ctx, span := tracer.Start(ctx, "Unlock") + defer span.End() // ensure parent path exists if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go index 795ce65c0e4..7dab2f499e0 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go @@ -218,6 +218,8 @@ func (n *Node) MarshalJSON() ([]byte, error) { // Type returns the node's resource type func (n *Node) Type(ctx context.Context) provider.ResourceType { + _, span := tracer.Start(ctx, "Type") + defer span.End() if n.nodeType != nil { return *n.nodeType } @@ -446,6 +448,8 @@ func (n *Node) Child(ctx context.Context, name string) (*Node, error) { // ParentWithReader returns the parent node func (n *Node) ParentWithReader(ctx context.Context, r io.Reader) (*Node, error) { + _, span := tracer.Start(ctx, "ParentWithReader") + defer span.End() if n.ParentID == "" { return nil, fmt.Errorf("decomposedfs: root has no parent") } @@ -1261,8 +1265,7 @@ func (n *Node) ProcessingID(ctx context.Context) (string, error) { // IsSpaceRoot checks if the node is a space root func (n *Node) IsSpaceRoot(ctx context.Context) bool { - _, err := n.Xattr(ctx, prefixes.SpaceNameAttr) - return err == nil + return n.ID == n.SpaceID } // SetScanData sets the virus scan info to the node diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/xattrs.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/xattrs.go index 621be8843d6..c31ebfde6eb 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/xattrs.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -68,6 +68,8 @@ func (md Attributes) Time(key string) (time.Time, error) { // SetXattrs sets multiple extended attributes on the write-through cache/node func (n *Node) SetXattrsWithContext(ctx context.Context, attribs map[string][]byte, acquireLock bool) (err error) { + _, span := tracer.Start(ctx, "SetXattrsWithContext") + defer span.End() if n.xattrsCache != nil { for k, v := range attribs { n.xattrsCache[k] = v diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/spacepermissions.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/spacepermissions.go index aa99a9cff3b..f22805dad6b 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/spacepermissions.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions/spacepermissions.go @@ -60,6 +60,8 @@ func (p Permissions) AssemblePermissions(ctx context.Context, n *node.Node) (*pr // AssembleTrashPermissions is used to assemble file permissions func (p Permissions) AssembleTrashPermissions(ctx context.Context, n *node.Node) (*provider.ResourcePermissions, error) { + _, span := tracer.Start(ctx, "AssembleTrashPermissions") + defer span.End() return p.item.AssembleTrashPermissions(ctx, n) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go index c9be0783f15..c9d2fbf0292 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go @@ -64,7 +64,8 @@ func (tb *DecomposedfsTrashbin) Setup(fs storage.FS) error { // ListRecycle returns the list of available recycle items // ref -> the space (= resourceid), key -> deleted node id, relativePath = relative to key func (tb *DecomposedfsTrashbin) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) { - + _, span := tracer.Start(ctx, "ListRecycle") + defer span.End() if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" { return nil, errtypes.BadRequest("spaceid required") } @@ -346,6 +347,8 @@ func (tb *DecomposedfsTrashbin) listTrashRoot(ctx context.Context, spaceID strin // RestoreRecycleItem restores the specified item func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error { + _, span := tracer.Start(ctx, "RestoreRecycleItem") + defer span.End() if ref == nil { return errtypes.BadRequest("missing reference, needs a space id") } @@ -399,6 +402,8 @@ func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *pro // PurgeRecycleItem purges the specified item, all its children and all their revisions func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error { + _, span := tracer.Start(ctx, "PurgeRecycleItem") + defer span.End() if ref == nil { return errtypes.BadRequest("missing reference, needs a space id") } @@ -429,6 +434,8 @@ func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provi // EmptyRecycle empties the trash func (tb *DecomposedfsTrashbin) EmptyRecycle(ctx context.Context, ref *provider.Reference) error { + _, span := tracer.Start(ctx, "EmptyRecycle") + defer span.End() if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" { return errtypes.BadRequest("spaceid must be set") } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/revisions.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/revisions.go index db7d730aa0c..bc3d4ffe3ac 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/revisions.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/revisions.go @@ -48,6 +48,8 @@ import ( // ListRevisions lists the revisions of the given resource func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Reference) (revisions []*provider.FileVersion, err error) { + _, span := tracer.Start(ctx, "ListRevisions") + defer span.End() var n *node.Node if n, err = fs.lu.NodeFromResource(ctx, ref); err != nil { return @@ -115,6 +117,8 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen // DownloadRevision returns a reader for the specified revision // FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813 func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) { + _, span := tracer.Start(ctx, "DownloadRevision") + defer span.End() log := appctx.GetLogger(ctx) // verify revision key format @@ -186,6 +190,8 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe // RestoreRevision restores the specified revision of the resource func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (returnErr error) { + _, span := tracer.Start(ctx, "RestoreRevision") + defer span.End() log := appctx.GetLogger(ctx) // verify revision key format @@ -330,6 +336,8 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer // DeleteRevision deletes the specified revision of the resource func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Reference, revisionKey string) error { + _, span := tracer.Start(ctx, "DeleteRevision") + defer span.End() n, err := fs.getRevisionNode(ctx, ref, revisionKey, func(rp *provider.ResourcePermissions) bool { return rp.RestoreFileVersion }) @@ -345,6 +353,8 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere } func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { + _, span := tracer.Start(ctx, "getRevisionNode") + defer span.End() log := appctx.GetLogger(ctx) // verify revision key format diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go index 2f55b1b99a1..14e65dea064 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator/async.go @@ -183,7 +183,7 @@ func (p AsyncPropagator) queuePropagation(ctx context.Context, spaceID, nodeID s ready = true break } - log.Error().Err(err).Msg("failed to write Change to disk (retrying)") + log.Debug().Err(err).Msg("failed to write Change to disk (retrying)") err = os.Mkdir(filepath.Dir(changePath), 0700) triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation } @@ -386,7 +386,7 @@ func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string, // a negative new treesize. Something must have gone wrong with the accounting. // Reset the current treesize to 0. log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", pc.SizeDiff). - Msg("Error when updating treesize of node. Updated treesize < 0. Reestting to 0") + Msg("Error when updating treesize of node. Updated treesize < 0. Resetting to 0") newSize = 0 default: newSize = treeSize - uint64(-pc.SizeDiff) @@ -414,7 +414,7 @@ func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string, log.Info().Msg("Propagation done. cleaning up") cleanup() - if !n.IsSpaceRoot(ctx) { // This does not seem robust as it checks the space name property + if !n.IsSpaceRoot(ctx) { p.queuePropagation(ctx, n.SpaceID, n.ParentID, pc, log) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go index e2530530472..83096baf417 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go @@ -106,6 +106,8 @@ func (t *Tree) Setup() error { // GetMD returns the metadata of a node in the tree func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) { + _, span := tracer.Start(ctx, "GetMD") + defer span.End() md, err := os.Stat(n.InternalPath()) if err != nil { if errors.Is(err, fs.ErrNotExist) { @@ -119,6 +121,8 @@ func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) { // TouchFile creates a new empty file func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool, mtime string) error { + _, span := tracer.Start(ctx, "TouchFile") + defer span.End() if n.Exists { if markprocessing { return n.SetXattr(ctx, prefixes.StatusPrefix, []byte(node.ProcessingStatus)) @@ -223,6 +227,8 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) { // Move replaces the target with the source func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) { + _, span := tracer.Start(ctx, "Move") + defer span.End() if oldNode.SpaceID != newNode.SpaceID { // WebDAV RFC https://www.rfc-editor.org/rfc/rfc4918#section-9.9.4 says to use // > 502 (Bad Gateway) - This may occur when the destination is on another @@ -432,6 +438,8 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro // Delete deletes a node in the tree by moving it to the trash func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { + _, span := tracer.Start(ctx, "Delete") + defer span.End() path := filepath.Join(n.ParentPath(), n.Name) // remove entry from cache immediately to avoid inconsistencies defer func() { _ = t.idCache.Delete(path) }() @@ -524,6 +532,8 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { // RestoreRecycleItemFunc returns a node and a function to restore it from the trash. func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, targetNode *node.Node) (*node.Node, *node.Node, func() error, error) { + _, span := tracer.Start(ctx, "RestoreRecycleItemFunc") + defer span.End() logger := appctx.GetLogger(ctx) recycleNode, trashItem, deletedNodePath, origin, err := t.readRecycleItem(ctx, spaceid, key, trashPath) @@ -623,6 +633,8 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa // PurgeRecycleItemFunc returns a node and a function to purge it from the trash func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, path string) (*node.Node, func() error, error) { + _, span := tracer.Start(ctx, "PurgeRecycleItemFunc") + defer span.End() logger := appctx.GetLogger(ctx) rn, trashItem, deletedNodePath, _, err := t.readRecycleItem(ctx, spaceid, key, path) @@ -664,25 +676,38 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa // InitNewNode initializes a new node func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) { + _, span := tracer.Start(ctx, "InitNewNode") + defer span.End() // create folder structure (if needed) - if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + + _, subspan := tracer.Start(ctx, "os.MkdirAll") + err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700) + subspan.End() + if err != nil { return nil, err } // create and write lock new node metadata + _, subspan = tracer.Start(ctx, "metadata.Lock") unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath()) + subspan.End() if err != nil { return nil, err } // we also need to touch the actual node file here it stores the mtime of the resource + _, subspan = tracer.Start(ctx, "os.OpenFile") h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600) + subspan.End() if err != nil { return unlock, err } h.Close() - if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil { + _, subspan = tracer.Start(ctx, "node.CheckQuota") + _, err = node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize) + subspan.End() + if err != nil { return unlock, err } @@ -692,7 +717,10 @@ func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (met log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger() log.Info().Msg("initNewNode: creating symlink") - if err = os.Symlink(relativeNodePath, childNameLink); err != nil { + _, subspan = tracer.Start(ctx, "os.Symlink") + err = os.Symlink(relativeNodePath, childNameLink) + subspan.End() + if err != nil { log.Info().Err(err).Msg("initNewNode: symlink failed") if errors.Is(err, fs.ErrExist) { log.Info().Err(err).Msg("initNewNode: symlink already exists") @@ -854,6 +882,8 @@ var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`) // TODO refactor the returned params into Node properties? would make all the path transformations go away... func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (recycleNode *node.Node, trashItem string, deletedNodePath string, origin string, err error) { + _, span := tracer.Start(ctx, "readRecycleItem") + defer span.End() logger := appctx.GetLogger(ctx) if key == "" { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go index fcd70d4e290..0c7d4fd319f 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go @@ -47,6 +47,8 @@ import ( // TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (*provider.ResourceInfo, error) { + _, span := tracer.Start(ctx, "Upload") + defer span.End() up, err := fs.GetUpload(ctx, req.Ref.GetPath()) if err != nil { return &provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") @@ -130,6 +132,8 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u // TODO read optional content for small files in this request // TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + _, span := tracer.Start(ctx, "InitiateUpload") + defer span.End() log := appctx.GetLogger(ctx) // remember the path from the reference diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/session.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/session.go index 072e32e3c21..bbcad253295 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/session.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/session.go @@ -81,6 +81,8 @@ func (s *OcisSession) executantUser() *userpb.User { // Purge deletes the upload session metadata and written binary data func (s *OcisSession) Purge(ctx context.Context) error { + _, span := tracer.Start(ctx, "Purge") + defer span.End() sessionPath := sessionPath(s.store.root, s.info.ID) f, err := lockedfile.OpenFile(sessionPath+".lock", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0600) if err != nil { @@ -112,6 +114,8 @@ func (s *OcisSession) TouchBin() error { // events can update the scan outcome and the finished event might read an empty file because of race conditions // so we need to lock the file while writing and use atomic writes func (s *OcisSession) Persist(ctx context.Context) error { + _, span := tracer.Start(ctx, "Persist") + defer span.End() sessionPath := sessionPath(s.store.root, s.info.ID) // create folder structure (if needed) if err := os.MkdirAll(filepath.Dir(sessionPath), 0700); err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go index d170fc5d872..df1e4df7a51 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go @@ -199,8 +199,8 @@ func (store OcisStore) Cleanup(ctx context.Context, session Session, revertNodeM // CreateNodeForUpload will create the target node for the Upload // TODO move this to the node package as NodeFromUpload? // should we in InitiateUpload create the node first? and then the upload? -func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.Attributes) (*node.Node, error) { - ctx, span := tracer.Start(session.Context(context.Background()), "CreateNodeForUpload") +func (store OcisStore) CreateNodeForUpload(ctx context.Context, session *OcisSession, initAttrs node.Attributes) (*node.Node, error) { + ctx, span := tracer.Start(session.Context(ctx), "CreateNodeForUpload") defer span.End() n := node.New( session.SpaceID(), @@ -303,6 +303,8 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. } func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (metadata.UnlockFunc, error) { + _, span := tracer.Start(ctx, "updateExistingNode") + defer span.End() targetPath := n.InternalPath() // write lock existing node before reading any metadata @@ -388,6 +390,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess } // clean revision file + span.AddEvent("os.Create") if _, err := os.Create(versionPath); err != nil { return unlock, err } @@ -405,6 +408,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess } session.info.MetaData["versionsPath"] = versionPath // keep mtime from previous version + span.AddEvent("os.Chtimes") if err := os.Chtimes(session.info.MetaData["versionsPath"], oldNodeMtime, oldNodeMtime); err != nil { return unlock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go index 7476d10058d..4a55035e961 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go @@ -184,7 +184,7 @@ func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error { } } - n, err := session.store.CreateNodeForUpload(session, attrs) + n, err := session.store.CreateNodeForUpload(ctx, session, attrs) if err != nil { return err } @@ -226,7 +226,7 @@ func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error { // for 0-byte uploads we take a shortcut and finalize isn't called elsewhere if !session.store.async || session.info.Size == 0 { // handle postprocessing synchronously - err = session.Finalize() + err = session.Finalize(ctx) session.store.Cleanup(ctx, session, err != nil, false, err == nil) if err != nil { log.Error().Err(err).Msg("failed to upload") @@ -279,8 +279,8 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo } // Finalize finalizes the upload (eg moves the file to the internal destination) -func (session *OcisSession) Finalize() (err error) { - ctx, span := tracer.Start(session.Context(context.Background()), "Finalize") +func (session *OcisSession) Finalize(ctx context.Context) (err error) { + ctx, span := tracer.Start(session.Context(ctx), "Finalize") defer span.End() revisionNode := node.New(session.SpaceID(), session.NodeID(), "", "", session.Size(), session.ID(), diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go index aa715024420..263976afd32 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go @@ -552,7 +552,11 @@ func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) { authCtx, span := tracer.Start(authCtx, "getAuthContext", trace.WithLinks(trace.LinkFromContext(ctx))) defer span.End() - client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr) + selector, err := pool.GatewaySelector(cs3.gatewayAddr) + if err != nil { + return nil, err + } + client, err := selector.Next() if err != nil { return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 4b8ac2e34d8..1b532da76e2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -367,7 +367,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.26.7 +# github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533 ## explicit; go 1.22.0 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime