Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to msgpack #3728

Merged
merged 10 commits into from
Mar 15, 2023
Next Next commit
Refactor tree migrations into a separate subpackage
aduffeck committed Mar 14, 2023
commit 5d227f99d9cdf2f38f2e86270c9aad6d604da5c2
9 changes: 9 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree"
@@ -135,6 +136,14 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
return nil, errors.Wrap(err, "could not setup tree")
}

// Run migrations & return
m := migrator.New(lu, log)
err = m.RunMigrations()
if err != nil {
log.Error().Err(err).Msg("could not migrate tree")
return nil, errors.Wrap(err, "could not migrate tree")
}

if o.MaxAcquireLockCycles != 0 {
filelocks.SetMaxLockCycles(o.MaxAcquireLockCycles)
}
125 changes: 125 additions & 0 deletions pkg/storage/utils/decomposedfs/migrator/0001.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2018-2023 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package migrator

import (
"errors"
"os"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
)

func (m *Migrator) migration0001() error {
// create spaces folder and iterate over existing nodes to populate it
nodesPath := filepath.Join(m.lu.InternalRoot(), "nodes")
fi, err := os.Stat(nodesPath)
if err == nil && fi.IsDir() {

f, err := os.Open(nodesPath)
if err != nil {
return err
}
nodes, err := f.Readdir(0)
if err != nil {
return err
}

for _, n := range nodes {
nodePath := filepath.Join(nodesPath, n.Name())

attr, err := m.lu.MetadataBackend().Get(nodePath, prefixes.ParentidAttr)
if err == nil && attr == node.RootID {
if err := m.moveNode(n.Name(), n.Name()); err != nil {
logger.New().Error().Err(err).
Str("space", n.Name()).
Msg("could not move space")
continue
}
m.linkSpaceNode("personal", n.Name())
}
}
// TODO delete nodesPath if empty
}
return nil
}

func (m *Migrator) moveNode(spaceID, nodeID string) error {
dirPath := filepath.Join(m.lu.InternalRoot(), "nodes", nodeID)
f, err := os.Open(dirPath)
if err != nil {
return err
}
children, err := f.Readdir(0)
if err != nil {
return err
}
for _, child := range children {
old := filepath.Join(m.lu.InternalRoot(), "nodes", child.Name())
new := filepath.Join(m.lu.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(child.Name(), 4, 2))
if err := os.Rename(old, new); err != nil {
logger.New().Error().Err(err).
Str("space", spaceID).
Str("nodes", child.Name()).
Str("oldpath", old).
Str("newpath", new).
Msg("could not rename node")
}
if child.IsDir() {
if err := m.moveNode(spaceID, child.Name()); err != nil {
return err
}
}
}
return nil
}

// linkSpace creates a new symbolic link for a space with the given type st, and node id
func (m *Migrator) linkSpaceNode(spaceType, spaceID string) {
spaceTypesPath := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType, spaceID)
expectedTarget := "../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
linkTarget, err := os.Readlink(spaceTypesPath)
if errors.Is(err, os.ErrNotExist) {
err = os.Symlink(expectedTarget, spaceTypesPath)
if err != nil {
logger.New().Error().Err(err).
Str("space_type", spaceType).
Str("space", spaceID).
Msg("could not create symlink")
}
} else {
if err != nil {
logger.New().Error().Err(err).
Str("space_type", spaceType).
Str("space", spaceID).
Msg("could not read symlink")
}
if linkTarget != expectedTarget {
logger.New().Warn().
Str("space_type", spaceType).
Str("space", spaceID).
Str("expected", expectedTarget).
Str("actual", linkTarget).
Msg("expected a different link target")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2018-2021 CERN
// Copyright 2018-2023 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,66 +16,18 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package tree
package migrator

import (
"io"
"os"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
)

/**
* This function runs all migrations in sequence.
* Note this sequence must not be changed or it might
* damage existing decomposed fs.
*/
func (t *Tree) runMigrations() error {
if err := t.migration0001Nodes(); err != nil {
return err
}
return t.migration0002SpaceTypes()
}

func (t *Tree) migration0001Nodes() error {
// create spaces folder and iterate over existing nodes to populate it
nodesPath := filepath.Join(t.root, "nodes")
fi, err := os.Stat(nodesPath)
if err == nil && fi.IsDir() {

f, err := os.Open(nodesPath)
if err != nil {
return err
}
nodes, err := f.Readdir(0)
if err != nil {
return err
}

for _, n := range nodes {
nodePath := filepath.Join(nodesPath, n.Name())

attr, err := t.lookup.MetadataBackend().Get(nodePath, prefixes.ParentidAttr)
if err == nil && string(attr) == node.RootID {
if err := t.moveNode(n.Name(), n.Name()); err != nil {
logger.New().Error().Err(err).
Str("space", n.Name()).
Msg("could not move space")
continue
}
t.linkSpaceNode("personal", n.Name())
}
}
// TODO delete nodesPath if empty
}
return nil
}

func (t *Tree) migration0002SpaceTypes() error {
spaceTypesPath := filepath.Join(t.root, "spacetypes")
func (m *Migrator) migration0002() error {
spaceTypesPath := filepath.Join(m.lu.InternalRoot(), "spacetypes")
fi, err := os.Stat(spaceTypesPath)
if err == nil && fi.IsDir() {

@@ -89,7 +41,7 @@ func (t *Tree) migration0002SpaceTypes() error {
}

for _, st := range spaceTypes {
err := t.moveSpaceType(st.Name())
err := m.moveSpaceType(st.Name())
if err != nil {
logger.New().Error().Err(err).
Str("space", st.Name()).
@@ -124,3 +76,60 @@ func (t *Tree) migration0002SpaceTypes() error {
}
return nil
}

func (m *Migrator) moveSpaceType(spaceType string) error {
dirPath := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType)
f, err := os.Open(dirPath)
if err != nil {
return err
}
children, err := f.Readdir(0)
if err != nil {
return err
}
for _, child := range children {
old := filepath.Join(m.lu.InternalRoot(), "spacetypes", spaceType, child.Name())
target, err := os.Readlink(old)
if err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("oldLink", old).
Msg("could not read old symlink")
continue
}
newDir := filepath.Join(m.lu.InternalRoot(), "indexes", "by-type", spaceType)
if err := os.MkdirAll(newDir, 0700); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("targetDir", newDir).
Msg("could not read old symlink")
}
newLink := filepath.Join(newDir, child.Name())
if err := os.Symlink(filepath.Join("..", target), newLink); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("oldpath", old).
Str("newpath", newLink).
Msg("could not rename node")
continue
}
if err := os.Remove(old); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("oldLink", old).
Msg("could not remove old symlink")
continue
}
}
if err := os.Remove(dirPath); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("dir", dirPath).
Msg("could not remove spaces folder, folder probably not empty")
}
return nil
}
110 changes: 110 additions & 0 deletions pkg/storage/utils/decomposedfs/migrator/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package migrator

import (
"encoding/json"
"os"
"path/filepath"
"reflect"

"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/rs/zerolog"
)

var allMigrations = []string{"0001", "0002"}

type migrationFunc func() error
type migrationState struct {
state string
message string
}
type migrationStates map[string]migrationState

// Migrator runs migrations on an existing decomposedfs
type Migrator struct {
lu *lookup.Lookup
states migrationStates
log *zerolog.Logger
}

// New returns a new Migrator instance
func New(lu *lookup.Lookup, log *zerolog.Logger) Migrator {
return Migrator{
lu: lu,
log: log,
}
}

/**
* RunMigrations runs all migrations in sequence.
* Note this sequence must not be changed or it might
* damage existing decomposed fs.
*/
func (m *Migrator) RunMigrations() error {
err := m.readStates()
if err != nil {
return err
}

for _, migration := range allMigrations {
if s, ok := m.states[migration]; !ok || s.state != "succeeded" {
if s.state != "" {
m.log.Info().Msg("Re-running migration " + migration + "...")
} else {
m.log.Info().Msg("Running migration " + migration + "...")
}
migrateMethod := reflect.ValueOf(m).MethodByName("migration" + migration)
v := migrateMethod.Call(nil)
err := v[0].Interface().(error)
if err != nil {
m.log.Error().Err(err).Msg("migration " + migration + " failed")
s.state = "failed"
s.message = err.Error()
m.writeStates()
return err
}
}
}
return nil
}

func (m *Migrator) readStates() error {
d, err := os.ReadFile(filepath.Join(m.lu.InternalRoot(), ".migrations"))
if err != nil {
if !os.IsNotExist(err) {
return err
}
}

if len(d) > 0 {
json.Unmarshal(d, &m.states)
}

return nil
}

func (m *Migrator) writeStates() error {
d, err := json.Marshal(m.states)
if err != nil {
m.log.Error().Err(err).Msg("could not marshal migration states")
return nil
}
return os.WriteFile(filepath.Join(m.lu.InternalRoot(), ".migrations"), d, 0600)
}
120 changes: 0 additions & 120 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
@@ -35,7 +35,6 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
@@ -110,128 +109,9 @@ func (t *Tree) Setup() error {
return err
}
}
// Run migrations & return
return t.runMigrations()
}

func (t *Tree) moveNode(spaceID, nodeID string) error {
dirPath := filepath.Join(t.root, "nodes", nodeID)
f, err := os.Open(dirPath)
if err != nil {
return err
}
children, err := f.Readdir(0)
if err != nil {
return err
}
for _, child := range children {
old := filepath.Join(t.root, "nodes", child.Name())
new := filepath.Join(t.root, "spaces", lookup.Pathify(spaceID, 1, 2), "nodes", lookup.Pathify(child.Name(), 4, 2))
if err := os.Rename(old, new); err != nil {
logger.New().Error().Err(err).
Str("space", spaceID).
Str("nodes", child.Name()).
Str("oldpath", old).
Str("newpath", new).
Msg("could not rename node")
}
if child.IsDir() {
if err := t.moveNode(spaceID, child.Name()); err != nil {
return err
}
}
}
return nil
}

func (t *Tree) moveSpaceType(spaceType string) error {
dirPath := filepath.Join(t.root, "spacetypes", spaceType)
f, err := os.Open(dirPath)
if err != nil {
return err
}
children, err := f.Readdir(0)
if err != nil {
return err
}
for _, child := range children {
old := filepath.Join(t.root, "spacetypes", spaceType, child.Name())
target, err := os.Readlink(old)
if err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("oldLink", old).
Msg("could not read old symlink")
continue
}
newDir := filepath.Join(t.root, "indexes", "by-type", spaceType)
if err := os.MkdirAll(newDir, 0700); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("targetDir", newDir).
Msg("could not read old symlink")
}
newLink := filepath.Join(newDir, child.Name())
if err := os.Symlink(filepath.Join("..", target), newLink); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("oldpath", old).
Str("newpath", newLink).
Msg("could not rename node")
continue
}
if err := os.Remove(old); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("nodes", child.Name()).
Str("oldLink", old).
Msg("could not remove old symlink")
continue
}
}
if err := os.Remove(dirPath); err != nil {
logger.New().Error().Err(err).
Str("space", spaceType).
Str("dir", dirPath).
Msg("could not remove spaces folder, folder probably not empty")
}
return nil
}

// linkSpace creates a new symbolic link for a space with the given type st, and node id
func (t *Tree) linkSpaceNode(spaceType, spaceID string) {
spaceTypesPath := filepath.Join(t.root, "spacetypes", spaceType, spaceID)
expectedTarget := "../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
linkTarget, err := os.Readlink(spaceTypesPath)
if errors.Is(err, os.ErrNotExist) {
err = os.Symlink(expectedTarget, spaceTypesPath)
if err != nil {
logger.New().Error().Err(err).
Str("space_type", spaceType).
Str("space", spaceID).
Msg("could not create symlink")
}
} else {
if err != nil {
logger.New().Error().Err(err).
Str("space_type", spaceType).
Str("space", spaceID).
Msg("could not read symlink")
}
if linkTarget != expectedTarget {
logger.New().Warn().
Str("space_type", spaceType).
Str("space", spaceID).
Str("expected", expectedTarget).
Str("actual", linkTarget).
Msg("expected a different link target")
}
}
}

// GetMD returns the metadata of a node in the tree
func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) {
md, err := os.Stat(n.InternalPath())