Skip to content

Commit

Permalink
publish event when share expired
Browse files Browse the repository at this point in the history
  • Loading branch information
David Christofas committed Jan 12, 2023
1 parent 69a8324 commit b6c1d39
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 19 deletions.
137 changes: 128 additions & 9 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
package jsoncs3

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"io"
"os"
"strings"
"sync"
"time"
Expand All @@ -38,6 +43,8 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache"
Expand All @@ -48,6 +55,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
)

/*
Expand Down Expand Up @@ -106,12 +114,21 @@ func init() {
}

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"`
CacheTTL int `mapstructure:"ttl"`
GatewayAddr string `mapstructure:"gateway_addr"`
ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"`
CacheTTL int `mapstructure:"ttl"`
Events EventOptions `mapstructure:"events"`
}

// EventOptions are the configurable options for events
type EventOptions struct {
NatsAddress string `mapstructure:"natsaddress"`
NatsClusterID string `mapstructure:"natsclusterid"`
TLSInsecure bool `mapstructure:"tlsinsecure"`
TLSRootCACertificate string `mapstructure:"tlsrootcacertificate"`
}

// Manager implements a share manager using a cs3 storage backend with local caching
Expand All @@ -128,7 +145,8 @@ type Manager struct {

initialized bool

gateway gatewayv1beta1.GatewayAPIClient
gateway gatewayv1beta1.GatewayAPIClient
eventStream events.Stream
}

// NewDefault returns a new manager instance with default dependencies
Expand All @@ -149,11 +167,49 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
return nil, err
}

return New(s, gc, c.CacheTTL)
var es events.Stream
if c.Events.NatsAddress != "" {
evtsCfg := c.Events
var (
rootCAPool *x509.CertPool
tlsConf *tls.Config
)
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return nil, err
}

var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, rootCrtFile); err != nil {
return nil, err
}

rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
evtsCfg.TLSInsecure = false

tlsConf = &tls.Config{
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}

es, err = stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.NatsAddress),
natsjs.ClusterID(evtsCfg.NatsClusterID),
)
if err != nil {
return nil, err
}
}

return New(s, gc, c.CacheTTL, es)
}

// New returns a new manager instance.
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int) (*Manager, error) {
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{
Cache: providercache.New(s, ttl),
Expand All @@ -162,6 +218,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int)
GroupReceivedCache: sharecache.New(s, "groups", "received.json", ttl),
storage: s,
gateway: gc,
eventStream: es,
}, nil
}

Expand Down Expand Up @@ -366,6 +423,22 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
if err != nil {
return nil, err
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
}
// check if we are the creator or the grantee
// TODO allow manager to get shares in a space created by other users
user := ctxpkg.ContextMustGetUser(ctx)
Expand Down Expand Up @@ -535,6 +608,16 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}
if !share.MatchesFilters(s, filters) {
Expand Down Expand Up @@ -587,6 +670,16 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}
if utils.UserEqual(user.GetId(), s.GetCreator()) {
Expand Down Expand Up @@ -668,6 +761,16 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}

Expand Down Expand Up @@ -725,6 +828,22 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
if !share.IsGrantedToUser(s, user) {
return nil, errtypes.NotFound(ref.String())
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
}
return m.convert(ctx, user.Id.GetOpaqueId(), s), nil
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/share/manager/jsoncs3/jsoncs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(err).ToNot(HaveOccurred())

client = &mocks.GatewayAPIClient{}
m, err = jsoncs3.New(storage, client, 0)
m, err = jsoncs3.New(storage, client, 0, nil)
Expect(err).ToNot(HaveOccurred())
})

Expand Down Expand Up @@ -250,7 +250,7 @@ var _ = Describe("Jsoncs3", func() {
})
Expect(s).ToNot(BeNil())

m, err = jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s = shareBykey(&collaboration.ShareKey{
Expand Down Expand Up @@ -444,7 +444,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("loads the cache when it doesn't have an entry", func() {
m, err := jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s, err := m.GetShare(ctx, shareRef)
Expand Down Expand Up @@ -504,7 +504,7 @@ var _ = Describe("Jsoncs3", func() {
})
Expect(err).ToNot(HaveOccurred())

m, err = jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s, err := m.GetShare(ctx, &collaboration.ShareReference{
Expand Down Expand Up @@ -617,7 +617,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(us).ToNot(BeNil())
Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue())

m, err = jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s = shareBykey(&collaboration.ShareKey{
Expand Down Expand Up @@ -748,7 +748,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncronizes the user received cache before listing", func() {
m, err := jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{})
Expand Down Expand Up @@ -816,7 +816,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncronizes the group received cache before listing", func() {
m, err := jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{})
Expand Down Expand Up @@ -860,7 +860,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncs the cache", func() {
m, err := jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down Expand Up @@ -894,7 +894,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncs the cache", func() {
m, err := jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down Expand Up @@ -1017,7 +1017,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(err).ToNot(HaveOccurred())
Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED))

m, err := jsoncs3.New(storage, nil, 0) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down

0 comments on commit b6c1d39

Please sign in to comment.