Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests-only] Merge edge into experimental #3345

Merged
merged 2 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/add-spaceowner-to-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Add SpaceOwner to some event

We added a SpaceOwner field to some of the events which can be used by
consumers to gain access to the affected space.

https://github.com/cs3org/reva/pull/3340
43 changes: 25 additions & 18 deletions internal/grpc/interceptors/eventsmiddleware/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

// ContainerCreated converts the response to an event
func ContainerCreated(r *provider.CreateContainerResponse, req *provider.CreateContainerRequest, executant *user.UserId) events.ContainerCreated {
func ContainerCreated(r *provider.CreateContainerResponse, req *provider.CreateContainerRequest, spaceOwner, executant *user.UserId) events.ContainerCreated {
return events.ContainerCreated{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
}
}

Expand Down Expand Up @@ -166,18 +167,20 @@ func LinkRemoved(r *link.RemovePublicShareResponse, req *link.RemovePublicShareR
}

// FileTouched converts the response to an event
func FileTouched(r *provider.TouchFileResponse, req *provider.TouchFileRequest, executant *user.UserId) events.FileTouched {
func FileTouched(r *provider.TouchFileResponse, req *provider.TouchFileRequest, spaceOwner, executant *user.UserId) events.FileTouched {
return events.FileTouched{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
}
}

// FileUploaded converts the response to an event
func FileUploaded(r *provider.InitiateFileUploadResponse, req *provider.InitiateFileUploadRequest, executant *user.UserId) events.FileUploaded {
func FileUploaded(r *provider.InitiateFileUploadResponse, req *provider.InitiateFileUploadRequest, spaceOwner, executant *user.UserId) events.FileUploaded {
return events.FileUploaded{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
}
}

Expand All @@ -190,11 +193,12 @@ func FileDownloaded(r *provider.InitiateFileDownloadResponse, req *provider.Init
}

// ItemTrashed converts the response to an event
func ItemTrashed(r *provider.DeleteResponse, req *provider.DeleteRequest, executant *user.UserId) events.ItemTrashed {
func ItemTrashed(r *provider.DeleteResponse, req *provider.DeleteRequest, spaceOwner, executant *user.UserId) events.ItemTrashed {
opaqueID := utils.ReadPlainFromOpaque(r.Opaque, "opaque_id")
return events.ItemTrashed{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
ID: &provider.ResourceId{
StorageId: req.Ref.GetResourceId().GetStorageId(),
SpaceId: req.Ref.GetResourceId().GetSpaceId(),
Expand All @@ -204,8 +208,9 @@ func ItemTrashed(r *provider.DeleteResponse, req *provider.DeleteRequest, execut
}

// ItemMoved converts the response to an event
func ItemMoved(r *provider.MoveResponse, req *provider.MoveRequest, executant *user.UserId) events.ItemMoved {
func ItemMoved(r *provider.MoveResponse, req *provider.MoveRequest, spaceOwner, executant *user.UserId) events.ItemMoved {
return events.ItemMoved{
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Destination,
OldReference: req.Source,
Expand All @@ -221,12 +226,13 @@ func ItemPurged(r *provider.PurgeRecycleResponse, req *provider.PurgeRecycleRequ
}

// ItemRestored converts the response to an event
func ItemRestored(r *provider.RestoreRecycleItemResponse, req *provider.RestoreRecycleItemRequest, executant *user.UserId) events.ItemRestored {
func ItemRestored(r *provider.RestoreRecycleItemResponse, req *provider.RestoreRecycleItemRequest, spaceOwner, executant *user.UserId) events.ItemRestored {
ref := req.Ref
if req.RestoreRef != nil {
ref = req.RestoreRef
}
return events.ItemRestored{
SpaceOwner: spaceOwner,
Executant: executant,
Ref: ref,
OldReference: req.Ref,
Expand All @@ -235,11 +241,12 @@ func ItemRestored(r *provider.RestoreRecycleItemResponse, req *provider.RestoreR
}

// FileVersionRestored converts the response to an event
func FileVersionRestored(r *provider.RestoreFileVersionResponse, req *provider.RestoreFileVersionRequest, executant *user.UserId) events.FileVersionRestored {
func FileVersionRestored(r *provider.RestoreFileVersionResponse, req *provider.RestoreFileVersionRequest, spaceOwner, executant *user.UserId) events.FileVersionRestored {
return events.FileVersionRestored{
Executant: executant,
Ref: req.Ref,
Key: req.Key,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
Key: req.Key,
}
}

Expand Down
24 changes: 18 additions & 6 deletions internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
)
Expand All @@ -63,11 +64,22 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
}

interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Register a channel in the context to receive the space owner id from the handler(s) further down the stack
var ownerID *user.UserId
sendOwnerChan := make(chan *user.UserId)
ctx = storagespace.ContextRegisterSendOwnerChan(ctx, sendOwnerChan)

res, err := handler(ctx, req)
if err != nil {
return res, err
}

// Read the space owner id from the channel
select {
case ownerID = <-sendOwnerChan:
default:
}

var executantID *user.UserId
u, ok := revactx.ContextGetUser(ctx)
if ok {
Expand Down Expand Up @@ -120,31 +132,31 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
}
case *provider.CreateContainerResponse:
if isSuccess(v) {
ev = ContainerCreated(v, req.(*provider.CreateContainerRequest), executantID)
ev = ContainerCreated(v, req.(*provider.CreateContainerRequest), ownerID, executantID)
}
case *provider.InitiateFileDownloadResponse:
if isSuccess(v) {
ev = FileDownloaded(v, req.(*provider.InitiateFileDownloadRequest), executantID)
}
case *provider.DeleteResponse:
if isSuccess(v) {
ev = ItemTrashed(v, req.(*provider.DeleteRequest), executantID)
ev = ItemTrashed(v, req.(*provider.DeleteRequest), ownerID, executantID)
}
case *provider.MoveResponse:
if isSuccess(v) {
ev = ItemMoved(v, req.(*provider.MoveRequest), executantID)
ev = ItemMoved(v, req.(*provider.MoveRequest), ownerID, executantID)
}
case *provider.PurgeRecycleResponse:
if isSuccess(v) {
ev = ItemPurged(v, req.(*provider.PurgeRecycleRequest), executantID)
}
case *provider.RestoreRecycleItemResponse:
if isSuccess(v) {
ev = ItemRestored(v, req.(*provider.RestoreRecycleItemRequest), executantID)
ev = ItemRestored(v, req.(*provider.RestoreRecycleItemRequest), ownerID, executantID)
}
case *provider.RestoreFileVersionResponse:
if isSuccess(v) {
ev = FileVersionRestored(v, req.(*provider.RestoreFileVersionRequest), executantID)
ev = FileVersionRestored(v, req.(*provider.RestoreFileVersionRequest), ownerID, executantID)
}
case *provider.CreateStorageSpaceResponse:
if isSuccess(v) && v.StorageSpace != nil { // TODO: Why are there CreateStorageSpaceResponses with nil StorageSpace?
Expand Down Expand Up @@ -172,7 +184,7 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
}
case *provider.TouchFileResponse:
if isSuccess(v) {
ev = FileTouched(v, req.(*provider.TouchFileRequest), executantID)
ev = FileTouched(v, req.(*provider.TouchFileRequest), ownerID, executantID)
}
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/events/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (

// ContainerCreated is emitted when a directory has been created
type ContainerCreated struct {
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -41,9 +42,10 @@ func (ContainerCreated) Unmarshal(v []byte) (interface{}, error) {

// FileUploaded is emitted when a file is uploaded
type FileUploaded struct {
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -55,8 +57,9 @@ func (FileUploaded) Unmarshal(v []byte) (interface{}, error) {

// FileTouched is emitted when a file is uploaded
type FileTouched struct {
Executant *user.UserId
Ref *provider.Reference
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -82,10 +85,11 @@ func (FileDownloaded) Unmarshal(v []byte) (interface{}, error) {

// ItemTrashed is emitted when a file or folder is trashed
type ItemTrashed struct {
Executant *user.UserId
ID *provider.ResourceId
Ref *provider.Reference
Owner *user.UserId
SpaceOwner *user.UserId
Executant *user.UserId
ID *provider.ResourceId
Ref *provider.Reference
Owner *user.UserId
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -97,6 +101,7 @@ func (ItemTrashed) Unmarshal(v []byte) (interface{}, error) {

// ItemMoved is emitted when a file or folder is moved
type ItemMoved struct {
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
Expand Down Expand Up @@ -127,6 +132,7 @@ func (ItemPurged) Unmarshal(v []byte) (interface{}, error) {

// ItemRestored is emitted when a file or folder is restored from trashbin
type ItemRestored struct {
SpaceOwner *user.UserId
Executant *user.UserId
ID *provider.ResourceId
Ref *provider.Reference
Expand All @@ -144,10 +150,11 @@ func (ItemRestored) Unmarshal(v []byte) (interface{}, error) {

// FileVersionRestored is emitted when a file version is restored
type FileVersionRestored struct {
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
Key string
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
Key string
}

// Unmarshal to fulfill umarshaller interface
Expand Down
9 changes: 5 additions & 4 deletions pkg/rhttp/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ type DataTX interface {
}

// EmitFileUploadedEvent is a helper function which publishes a FileUploaded event
func EmitFileUploadedEvent(owner *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error {
func EmitFileUploadedEvent(spaceOwnerOrManager, executant *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error {
if ref == nil || publisher == nil {
return nil
}

uploadedEv := events.FileUploaded{
Owner: owner,
Executant: owner,
Ref: ref,
SpaceOwner: spaceOwnerOrManager,
Owner: spaceOwnerOrManager,
Executant: executant,
Ref: ref,
}

return events.Publish(publisher, uploadedEv)
Expand Down
4 changes: 2 additions & 2 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
defer r.Body.Close()

ref := &provider.Reference{Path: fn}
info, err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) {
info, err := fs.Upload(ctx, ref, r.Body, func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) {
datatx.InvalidateCache(owner, ref, m.statCache)
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
sublog.Error().Err(err).Msg("failed to publish FileUploaded event")
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/rhttp/datatx/manager/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
Path: fn,
}
var info provider.ResourceInfo
info, err = fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) {
info, err = fs.Upload(ctx, ref, r.Body, func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) {
datatx.InvalidateCache(owner, ref, m.statCache)
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
sublog.Error().Err(err).Msg("failed to publish FileUploaded event")
}
})
Expand Down
5 changes: 4 additions & 1 deletion pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
for {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
Expand All @@ -123,7 +126,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
}
datatx.InvalidateCache(owner, ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/fs/owncloudsql/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r
if !ok {
return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context")
}
uff(owner.Id, uploadRef)
// spaces support in localfs needs to be revisited:
// * info.Storage["SpaceRoot"] is never set
// * there is no space owner or manager that could be passed to the UploadFinishedFunc
uff(owner.Id, owner.Id, uploadRef)
}

ri := provider.ResourceInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(owner *userpb.UserId, ref *provider.Reference)
type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference)

// FS is the interface to implement access to the storage.
type FS interface {
Expand Down
Loading