Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share manager migration #2911

Merged
merged 16 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/share-manager-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Allow for dumping and loading shares

We now have interfaces for dumpable and loadable share manages which can be used to migrate shares between share managers

https://github.com/cs3org/reva/pull/2911
77 changes: 43 additions & 34 deletions pkg/publicshare/manager/cs3/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cs3org/reva/v2/pkg/publicshare/manager/registry"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage/utils/indexer"
indexerErrors "github.com/cs3org/reva/v2/pkg/storage/utils/indexer/errors"
"github.com/cs3org/reva/v2/pkg/storage/utils/indexer/option"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
Expand All @@ -66,12 +67,6 @@ type Manager struct {
initialized bool
}

// PublicShareWithPassword represents a public share including its hashes password
type PublicShareWithPassword struct {
PublicShare *link.PublicShare `json:"public_share"`
HashedPassword string `json:"password"`
}

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
ProviderAddr string `mapstructure:"provider_addr"`
Expand Down Expand Up @@ -103,7 +98,7 @@ func NewDefault(m map[string]interface{}) (publicshare.Manager, error) {
}

// New returns a new manager instance
func New(gatewayClient gateway.GatewayAPIClient, storage metadata.Storage, indexer indexer.Indexer, passwordHashCost int) (publicshare.Manager, error) {
func New(gatewayClient gateway.GatewayAPIClient, storage metadata.Storage, indexer indexer.Indexer, passwordHashCost int) (*Manager, error) {
return &Manager{
gatewayClient: gatewayClient,
storage: storage,
Expand Down Expand Up @@ -161,6 +156,20 @@ func (m *Manager) initialize() error {
return nil
}

// Load imports public shares and received shares from channels (e.g. during migration)
func (m *Manager) Load(ctx context.Context, shareChan <-chan *publicshare.WithPassword) error {
log := appctx.GetLogger(ctx)
if err := m.initialize(); err != nil {
return err
}
for ps := range shareChan {
if err := m.persist(context.Background(), ps); err != nil {
log.Error().Err(err).Interface("publicshare", ps).Msg("error loading public share")
}
}
return nil
}

// CreatePublicShare creates a new public share
func (m *Manager) CreatePublicShare(ctx context.Context, u *user.User, ri *provider.ResourceInfo, g *link.Grant) (*link.PublicShare, error) {
if err := m.initialize(); err != nil {
Expand Down Expand Up @@ -200,8 +209,8 @@ func (m *Manager) CreatePublicShare(ctx context.Context, u *user.User, ri *provi
Nanos: uint32(now % int64(time.Second)),
}

s := &PublicShareWithPassword{
PublicShare: &link.PublicShare{
s := &publicshare.WithPassword{
PublicShare: link.PublicShare{
Id: id,
Owner: ri.GetOwner(),
Creator: u.Id,
Expand All @@ -215,15 +224,15 @@ func (m *Manager) CreatePublicShare(ctx context.Context, u *user.User, ri *provi
DisplayName: displayName,
Quicklink: quicklink,
},
HashedPassword: password,
Password: password,
}

err := m.persist(ctx, s)
if err != nil {
return nil, err
}

return s.PublicShare, nil
return &s.PublicShare, nil
}

// UpdatePublicShare updates an existing public share
Expand All @@ -249,7 +258,7 @@ func (m *Manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link
if err != nil {
return nil, errors.Wrap(err, "could not hash share password")
}
ps.HashedPassword = string(h)
ps.Password = string(h)
ps.PublicShare.PasswordProtected = true
default:
return nil, errtypes.BadRequest("no valid update type given")
Expand All @@ -260,7 +269,7 @@ func (m *Manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link
return nil, err
}

return ps.PublicShare, nil
return &ps.PublicShare, nil
}

// GetPublicShare returns an existing public share
Expand All @@ -275,16 +284,16 @@ func (m *Manager) GetPublicShare(ctx context.Context, u *user.User, ref *link.Pu
}

if ps.PublicShare.PasswordProtected && sign {
err = publicshare.AddSignature(ps.PublicShare, ps.HashedPassword)
err = publicshare.AddSignature(&ps.PublicShare, ps.Password)
if err != nil {
return nil, err
}
}

return ps.PublicShare, nil
return &ps.PublicShare, nil
}

func (m *Manager) getWithPassword(ctx context.Context, ref *link.PublicShareReference) (*PublicShareWithPassword, error) {
func (m *Manager) getWithPassword(ctx context.Context, ref *link.PublicShareReference) (*publicshare.WithPassword, error) {
switch {
case ref.GetToken() != "":
return m.getByToken(ctx, ref.GetToken())
Expand All @@ -295,7 +304,7 @@ func (m *Manager) getWithPassword(ctx context.Context, ref *link.PublicShareRefe
}
}

func (m *Manager) getByID(ctx context.Context, id string) (*PublicShareWithPassword, error) {
func (m *Manager) getByID(ctx context.Context, id string) (*publicshare.WithPassword, error) {
tokens, err := m.indexer.FindBy(&link.PublicShare{},
indexer.NewField("Id.OpaqueId", id),
)
Expand All @@ -308,14 +317,14 @@ func (m *Manager) getByID(ctx context.Context, id string) (*PublicShareWithPassw
return m.getByToken(ctx, tokens[0])
}

func (m *Manager) getByToken(ctx context.Context, token string) (*PublicShareWithPassword, error) {
func (m *Manager) getByToken(ctx context.Context, token string) (*publicshare.WithPassword, error) {
fn := path.Join("publicshares", token)
data, err := m.storage.SimpleDownload(ctx, fn)
if err != nil {
return nil, err
}

ps := &PublicShareWithPassword{}
ps := &publicshare.WithPassword{}
err = json.Unmarshal(data, ps)
if err != nil {
return nil, err
Expand Down Expand Up @@ -349,11 +358,11 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
return nil, err
}

if !publicshare.MatchesFilters(*ps.PublicShare, filters) {
if !publicshare.MatchesFilters(ps.PublicShare, filters) {
continue
}

if publicshare.IsExpired(*ps.PublicShare) {
if publicshare.IsExpired(ps.PublicShare) {
ref := &link.PublicShareReference{
Spec: &link.PublicShareReference_Id{
Id: ps.PublicShare.Id,
Expand All @@ -369,12 +378,12 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
}

if ps.PublicShare.PasswordProtected && sign {
err = publicshare.AddSignature(ps.PublicShare, ps.HashedPassword)
err = publicshare.AddSignature(&ps.PublicShare, ps.Password)
if err != nil {
return nil, err
}
}
result = append(result, ps.PublicShare)
result = append(result, &ps.PublicShare)
shareMem[ps.PublicShare.Token] = struct{}{}
}

Expand Down Expand Up @@ -433,8 +442,8 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
if err != nil {
return nil, err
}
if publicshare.MatchesFilters(*s.PublicShare, filters) {
result = append(result, s.PublicShare)
if publicshare.MatchesFilters(s.PublicShare, filters) {
result = append(result, &s.PublicShare)
shareMem[s.PublicShare.Token] = struct{}{}
}
}
Expand Down Expand Up @@ -473,17 +482,17 @@ func (m *Manager) GetPublicShareByToken(ctx context.Context, token string, auth
return nil, err
}

if publicshare.IsExpired(*ps.PublicShare) {
if publicshare.IsExpired(ps.PublicShare) {
return nil, errtypes.NotFound("public share has expired")
}

if ps.PublicShare.PasswordProtected {
if !publicshare.Authenticate(ps.PublicShare, ps.HashedPassword, auth) {
if !publicshare.Authenticate(&ps.PublicShare, ps.Password, auth) {
return nil, errtypes.InvalidCredentials("access denied")
}
}

return ps.PublicShare, nil
return &ps.PublicShare, nil
}

func indexOwnerFunc(v interface{}) (string, error) {
Expand Down Expand Up @@ -518,7 +527,7 @@ func resourceIDToIndex(id *provider.ResourceId) string {
return strings.Join([]string{id.StorageId, id.OpaqueId}, "!")
}

func (m *Manager) persist(ctx context.Context, ps *PublicShareWithPassword) error {
func (m *Manager) persist(ctx context.Context, ps *publicshare.WithPassword) error {
data, err := json.Marshal(ps)
if err != nil {
return err
Expand All @@ -530,16 +539,16 @@ func (m *Manager) persist(ctx context.Context, ps *PublicShareWithPassword) erro
return err
}

_, err = m.indexer.Add(ps.PublicShare)
_, err = m.indexer.Add(&ps.PublicShare)
if err != nil {
if _, ok := err.(errtypes.IsAlreadyExists); !ok {
return err
if _, ok := err.(*indexerErrors.AlreadyExistsErr); ok {
return nil
}
err = m.indexer.Delete(ps.PublicShare)
err = m.indexer.Delete(&ps.PublicShare)
if err != nil {
return err
}
_, err = m.indexer.Add(ps.PublicShare)
_, err = m.indexer.Add(&ps.PublicShare)
return err
}

Expand Down
47 changes: 39 additions & 8 deletions pkg/publicshare/manager/cs3/cs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"strings"
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -146,10 +147,10 @@ var _ = Describe("Cs3", func() {
Expect(link.Token).ToNot(Equal(""))
Expect(link.PasswordProtected).To(BeTrue())
storage.AssertCalled(GinkgoT(), "SimpleUpload", mock.Anything, mock.Anything, mock.MatchedBy(func(in []byte) bool {
ps := cs3.PublicShareWithPassword{}
ps := publicshare.WithPassword{}
err = json.Unmarshal(in, &ps)
Expect(err).ToNot(HaveOccurred())
return bcrypt.CompareHashAndPassword([]byte(ps.HashedPassword), []byte("secret123")) == nil
return bcrypt.CompareHashAndPassword([]byte(ps.Password), []byte("secret123")) == nil
}))
})

Expand Down Expand Up @@ -182,13 +183,43 @@ var _ = Describe("Cs3", func() {
h, err := bcrypt.GenerateFromPassword([]byte(grant.Password), bcrypt.DefaultCost)
Expect(err).ToNot(HaveOccurred())
hashedPassword = string(h)
shareJSON, err := json.Marshal(cs3.PublicShareWithPassword{PublicShare: existingShare, HashedPassword: hashedPassword})
shareJSON, err := json.Marshal(publicshare.WithPassword{PublicShare: *existingShare, Password: hashedPassword})
Expect(err).ToNot(HaveOccurred())
storage.On("SimpleDownload", mock.Anything, mock.MatchedBy(func(in string) bool {
return strings.HasPrefix(in, "publicshares/")
})).Return(shareJSON, nil)
})

Describe("Load", func() {
It("loads shares including state and mountpoint information", func() {
m, err := cs3.New(nil, storage, indexer, bcrypt.DefaultCost)
Expect(err).ToNot(HaveOccurred())

sharesChan := make(chan *publicshare.WithPassword)

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
err := m.Load(ctx, sharesChan)
Expect(err).ToNot(HaveOccurred())
wg.Done()
}()
go func() {
sharesChan <- &publicshare.WithPassword{
Password: "foo",
PublicShare: *existingShare,
}
close(sharesChan)
wg.Done()
}()
wg.Wait()
Eventually(sharesChan).Should(BeClosed())

expectedPath := path.Join("publicshares", existingShare.Token)
storage.AssertCalled(GinkgoT(), "SimpleUpload", mock.Anything, expectedPath, mock.Anything)
})
})

Describe("ListPublicShares", func() {
It("lists existing shares", func() {
shares, err := m.ListPublicShares(ctx, user, []*link.ListPublicSharesRequest_Filter{}, false)
Expand Down Expand Up @@ -439,7 +470,7 @@ var _ = Describe("Cs3", func() {
Expect(ps).ToNot(BeNil())
Expect(ps.DisplayName).To(Equal("new displayname"))
storage.AssertCalled(GinkgoT(), "SimpleUpload", mock.Anything, path.Join("publicshares", ps.Token), mock.MatchedBy(func(data []byte) bool {
s := cs3.PublicShareWithPassword{}
s := publicshare.WithPassword{}
err := json.Unmarshal(data, &s)
Expect(err).ToNot(HaveOccurred())
return s.PublicShare.DisplayName == "new displayname"
Expand All @@ -457,10 +488,10 @@ var _ = Describe("Cs3", func() {
Expect(err).ToNot(HaveOccurred())
Expect(ps).ToNot(BeNil())
storage.AssertCalled(GinkgoT(), "SimpleUpload", mock.Anything, path.Join("publicshares", ps.Token), mock.MatchedBy(func(data []byte) bool {
s := cs3.PublicShareWithPassword{}
s := publicshare.WithPassword{}
err := json.Unmarshal(data, &s)
Expect(err).ToNot(HaveOccurred())
return s.HashedPassword != ""
return s.Password != ""
}))
})

Expand All @@ -475,7 +506,7 @@ var _ = Describe("Cs3", func() {
Expect(err).ToNot(HaveOccurred())
Expect(ps).ToNot(BeNil())
storage.AssertCalled(GinkgoT(), "SimpleUpload", mock.Anything, path.Join("publicshares", ps.Token), mock.MatchedBy(func(data []byte) bool {
s := cs3.PublicShareWithPassword{}
s := publicshare.WithPassword{}
err := json.Unmarshal(data, &s)
Expect(err).ToNot(HaveOccurred())
return s.PublicShare.Permissions.Permissions.Delete
Expand All @@ -494,7 +525,7 @@ var _ = Describe("Cs3", func() {
Expect(err).ToNot(HaveOccurred())
Expect(ps).ToNot(BeNil())
storage.AssertCalled(GinkgoT(), "SimpleUpload", mock.Anything, path.Join("publicshares", ps.Token), mock.MatchedBy(func(data []byte) bool {
s := cs3.PublicShareWithPassword{}
s := publicshare.WithPassword{}
err := json.Unmarshal(data, &s)
Expect(err).ToNot(HaveOccurred())
return s.PublicShare.Expiration != nil && s.PublicShare.Expiration.Seconds == ts.Seconds
Expand Down
31 changes: 26 additions & 5 deletions pkg/publicshare/manager/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ func (m *manager) startJanitorRun() {
}
}

// Dump exports public shares to channels (e.g. during migration)
func (m *manager) Dump(ctx context.Context, shareChan chan<- *publicshare.WithPassword) error {
log := appctx.GetLogger(ctx)

m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
if err != nil {
return err
}

for _, v := range db {
var local publicshare.WithPassword
if err := utils.UnmarshalJSONToProtoV1([]byte(v.(map[string]interface{})["share"].(string)), &local.PublicShare); err != nil {
log.Error().Err(err).Msg("error unmarshalling share")
}
local.Password = v.(map[string]interface{})["password"].(string)
shareChan <- &local
}
close(shareChan)

return nil
}

// CreatePublicShare adds a new entry to manager.shares
func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *provider.ResourceInfo, g *link.Grant) (*link.PublicShare, error) {
id := &link.PublicShareId{
Expand Down Expand Up @@ -590,11 +615,7 @@ func (m *manager) writeDb(db map[string]interface{}) error {
return err
}

if err := ioutil.WriteFile(m.file, dbAsJSON, 0644); err != nil {
return err
}

return nil
return ioutil.WriteFile(m.file, dbAsJSON, 0644)
}

type publicShare struct {
Expand Down
Loading