Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/edge' into experimental
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Oct 13, 2022
2 parents 5af656e + 11cc78a commit 137a5f9
Show file tree
Hide file tree
Showing 20 changed files with 217 additions and 58 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/add-spaceowner-to-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Add SpaceOwner to some event

We added a SpaceOwner field to some of the events which can be used by
consumers to gain access to the affected space.

https://github.com/cs3org/reva/pull/3340
43 changes: 25 additions & 18 deletions internal/grpc/interceptors/eventsmiddleware/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

// ContainerCreated converts the response to an event
func ContainerCreated(r *provider.CreateContainerResponse, req *provider.CreateContainerRequest, executant *user.UserId) events.ContainerCreated {
func ContainerCreated(r *provider.CreateContainerResponse, req *provider.CreateContainerRequest, spaceOwner, executant *user.UserId) events.ContainerCreated {
return events.ContainerCreated{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
}
}

Expand Down Expand Up @@ -166,18 +167,20 @@ func LinkRemoved(r *link.RemovePublicShareResponse, req *link.RemovePublicShareR
}

// FileTouched converts the response to an event
func FileTouched(r *provider.TouchFileResponse, req *provider.TouchFileRequest, executant *user.UserId) events.FileTouched {
func FileTouched(r *provider.TouchFileResponse, req *provider.TouchFileRequest, spaceOwner, executant *user.UserId) events.FileTouched {
return events.FileTouched{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
}
}

// FileUploaded converts the response to an event
func FileUploaded(r *provider.InitiateFileUploadResponse, req *provider.InitiateFileUploadRequest, executant *user.UserId) events.FileUploaded {
func FileUploaded(r *provider.InitiateFileUploadResponse, req *provider.InitiateFileUploadRequest, spaceOwner, executant *user.UserId) events.FileUploaded {
return events.FileUploaded{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
}
}

Expand All @@ -190,11 +193,12 @@ func FileDownloaded(r *provider.InitiateFileDownloadResponse, req *provider.Init
}

// ItemTrashed converts the response to an event
func ItemTrashed(r *provider.DeleteResponse, req *provider.DeleteRequest, executant *user.UserId) events.ItemTrashed {
func ItemTrashed(r *provider.DeleteResponse, req *provider.DeleteRequest, spaceOwner, executant *user.UserId) events.ItemTrashed {
opaqueID := utils.ReadPlainFromOpaque(r.Opaque, "opaque_id")
return events.ItemTrashed{
Executant: executant,
Ref: req.Ref,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
ID: &provider.ResourceId{
StorageId: req.Ref.GetResourceId().GetStorageId(),
SpaceId: req.Ref.GetResourceId().GetSpaceId(),
Expand All @@ -204,8 +208,9 @@ func ItemTrashed(r *provider.DeleteResponse, req *provider.DeleteRequest, execut
}

// ItemMoved converts the response to an event
func ItemMoved(r *provider.MoveResponse, req *provider.MoveRequest, executant *user.UserId) events.ItemMoved {
func ItemMoved(r *provider.MoveResponse, req *provider.MoveRequest, spaceOwner, executant *user.UserId) events.ItemMoved {
return events.ItemMoved{
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Destination,
OldReference: req.Source,
Expand All @@ -221,12 +226,13 @@ func ItemPurged(r *provider.PurgeRecycleResponse, req *provider.PurgeRecycleRequ
}

// ItemRestored converts the response to an event
func ItemRestored(r *provider.RestoreRecycleItemResponse, req *provider.RestoreRecycleItemRequest, executant *user.UserId) events.ItemRestored {
func ItemRestored(r *provider.RestoreRecycleItemResponse, req *provider.RestoreRecycleItemRequest, spaceOwner, executant *user.UserId) events.ItemRestored {
ref := req.Ref
if req.RestoreRef != nil {
ref = req.RestoreRef
}
return events.ItemRestored{
SpaceOwner: spaceOwner,
Executant: executant,
Ref: ref,
OldReference: req.Ref,
Expand All @@ -235,11 +241,12 @@ func ItemRestored(r *provider.RestoreRecycleItemResponse, req *provider.RestoreR
}

// FileVersionRestored converts the response to an event
func FileVersionRestored(r *provider.RestoreFileVersionResponse, req *provider.RestoreFileVersionRequest, executant *user.UserId) events.FileVersionRestored {
func FileVersionRestored(r *provider.RestoreFileVersionResponse, req *provider.RestoreFileVersionRequest, spaceOwner, executant *user.UserId) events.FileVersionRestored {
return events.FileVersionRestored{
Executant: executant,
Ref: req.Ref,
Key: req.Key,
SpaceOwner: spaceOwner,
Executant: executant,
Ref: req.Ref,
Key: req.Key,
}
}

Expand Down
24 changes: 18 additions & 6 deletions internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
)
Expand All @@ -63,11 +64,22 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
}

interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Register a channel in the context to receive the space owner id from the handler(s) further down the stack
var ownerID *user.UserId
sendOwnerChan := make(chan *user.UserId)
ctx = storagespace.ContextRegisterSendOwnerChan(ctx, sendOwnerChan)

res, err := handler(ctx, req)
if err != nil {
return res, err
}

// Read the space owner id from the channel
select {
case ownerID = <-sendOwnerChan:
default:
}

var executantID *user.UserId
u, ok := revactx.ContextGetUser(ctx)
if ok {
Expand Down Expand Up @@ -120,31 +132,31 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
}
case *provider.CreateContainerResponse:
if isSuccess(v) {
ev = ContainerCreated(v, req.(*provider.CreateContainerRequest), executantID)
ev = ContainerCreated(v, req.(*provider.CreateContainerRequest), ownerID, executantID)
}
case *provider.InitiateFileDownloadResponse:
if isSuccess(v) {
ev = FileDownloaded(v, req.(*provider.InitiateFileDownloadRequest), executantID)
}
case *provider.DeleteResponse:
if isSuccess(v) {
ev = ItemTrashed(v, req.(*provider.DeleteRequest), executantID)
ev = ItemTrashed(v, req.(*provider.DeleteRequest), ownerID, executantID)
}
case *provider.MoveResponse:
if isSuccess(v) {
ev = ItemMoved(v, req.(*provider.MoveRequest), executantID)
ev = ItemMoved(v, req.(*provider.MoveRequest), ownerID, executantID)
}
case *provider.PurgeRecycleResponse:
if isSuccess(v) {
ev = ItemPurged(v, req.(*provider.PurgeRecycleRequest), executantID)
}
case *provider.RestoreRecycleItemResponse:
if isSuccess(v) {
ev = ItemRestored(v, req.(*provider.RestoreRecycleItemRequest), executantID)
ev = ItemRestored(v, req.(*provider.RestoreRecycleItemRequest), ownerID, executantID)
}
case *provider.RestoreFileVersionResponse:
if isSuccess(v) {
ev = FileVersionRestored(v, req.(*provider.RestoreFileVersionRequest), executantID)
ev = FileVersionRestored(v, req.(*provider.RestoreFileVersionRequest), ownerID, executantID)
}
case *provider.CreateStorageSpaceResponse:
if isSuccess(v) && v.StorageSpace != nil { // TODO: Why are there CreateStorageSpaceResponses with nil StorageSpace?
Expand Down Expand Up @@ -172,7 +184,7 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
}
case *provider.TouchFileResponse:
if isSuccess(v) {
ev = FileTouched(v, req.(*provider.TouchFileRequest), executantID)
ev = FileTouched(v, req.(*provider.TouchFileRequest), ownerID, executantID)
}
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/events/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (

// ContainerCreated is emitted when a directory has been created
type ContainerCreated struct {
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -41,9 +42,10 @@ func (ContainerCreated) Unmarshal(v []byte) (interface{}, error) {

// FileUploaded is emitted when a file is uploaded
type FileUploaded struct {
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -55,8 +57,9 @@ func (FileUploaded) Unmarshal(v []byte) (interface{}, error) {

// FileTouched is emitted when a file is uploaded
type FileTouched struct {
Executant *user.UserId
Ref *provider.Reference
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -82,10 +85,11 @@ func (FileDownloaded) Unmarshal(v []byte) (interface{}, error) {

// ItemTrashed is emitted when a file or folder is trashed
type ItemTrashed struct {
Executant *user.UserId
ID *provider.ResourceId
Ref *provider.Reference
Owner *user.UserId
SpaceOwner *user.UserId
Executant *user.UserId
ID *provider.ResourceId
Ref *provider.Reference
Owner *user.UserId
}

// Unmarshal to fulfill umarshaller interface
Expand All @@ -97,6 +101,7 @@ func (ItemTrashed) Unmarshal(v []byte) (interface{}, error) {

// ItemMoved is emitted when a file or folder is moved
type ItemMoved struct {
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
Expand Down Expand Up @@ -127,6 +132,7 @@ func (ItemPurged) Unmarshal(v []byte) (interface{}, error) {

// ItemRestored is emitted when a file or folder is restored from trashbin
type ItemRestored struct {
SpaceOwner *user.UserId
Executant *user.UserId
ID *provider.ResourceId
Ref *provider.Reference
Expand All @@ -144,10 +150,11 @@ func (ItemRestored) Unmarshal(v []byte) (interface{}, error) {

// FileVersionRestored is emitted when a file version is restored
type FileVersionRestored struct {
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
Key string
SpaceOwner *user.UserId
Executant *user.UserId
Ref *provider.Reference
Owner *user.UserId
Key string
}

// Unmarshal to fulfill umarshaller interface
Expand Down
9 changes: 5 additions & 4 deletions pkg/rhttp/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ type DataTX interface {
}

// EmitFileUploadedEvent is a helper function which publishes a FileUploaded event
func EmitFileUploadedEvent(owner *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error {
func EmitFileUploadedEvent(spaceOwnerOrManager, executant *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error {
if ref == nil || publisher == nil {
return nil
}

uploadedEv := events.FileUploaded{
Owner: owner,
Executant: owner,
Ref: ref,
SpaceOwner: spaceOwnerOrManager,
Owner: spaceOwnerOrManager,
Executant: executant,
Ref: ref,
}

return events.Publish(publisher, uploadedEv)
Expand Down
4 changes: 2 additions & 2 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
defer r.Body.Close()

ref := &provider.Reference{Path: fn}
info, err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) {
info, err := fs.Upload(ctx, ref, r.Body, func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) {
datatx.InvalidateCache(owner, ref, m.statCache)
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
sublog.Error().Err(err).Msg("failed to publish FileUploaded event")
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/rhttp/datatx/manager/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
Path: fn,
}
var info provider.ResourceInfo
info, err = fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) {
info, err = fs.Upload(ctx, ref, r.Body, func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) {
datatx.InvalidateCache(owner, ref, m.statCache)
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
sublog.Error().Err(err).Msg("failed to publish FileUploaded event")
}
})
Expand Down
5 changes: 4 additions & 1 deletion pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
for {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
Expand All @@ -123,7 +126,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
}
datatx.InvalidateCache(owner, ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/fs/owncloudsql/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r
if !ok {
return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context")
}
uff(owner.Id, uploadRef)
// spaces support in localfs needs to be revisited:
// * info.Storage["SpaceRoot"] is never set
// * there is no space owner or manager that could be passed to the UploadFinishedFunc
uff(owner.Id, owner.Id, uploadRef)
}

ri := provider.ResourceInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(owner *userpb.UserId, ref *provider.Reference)
type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference)

// FS is the interface to implement access to the storage.
type FS interface {
Expand Down
Loading

0 comments on commit 137a5f9

Please sign in to comment.