Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing 0 byte msgpack metadata #4033

Merged
merged 15 commits into from
Jul 4, 2023
Merged
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-0-byte-msgpack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: fix writing 0 byte msgpack metadata

File metadata is now written atomically to be more resilient during timeouts

https://github.com/cs3org/reva/pull/4033
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/renameio/v2 v2.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.13.0 // indirect
github.com/hashicorp/consul/api v1.15.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg=
github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string) bool) (err error) {
// Acquire a read log on the source node
// write lock existing node before reading treesize or tree time
f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src))
lock, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(src), os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
Expand All @@ -293,15 +293,15 @@ func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter f
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := f.Close()
rerr := lock.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, f)
return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, lock)
}

// CopyMetadataWithSourceLock copies all extended attributes from source to target.
Expand All @@ -312,11 +312,11 @@ func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, ta
switch {
case lockedSource == nil:
return errors.New("no lock provided")
case lockedSource.File.Name() != lu.MetadataBackend().MetadataPath(sourcePath):
case lockedSource.File.Name() != lu.MetadataBackend().LockfilePath(sourcePath):
return errors.New("lockpath does not match filepath")
}

attrs, err := lu.metadataBackend.AllWithLockedSource(ctx, sourcePath, lockedSource)
attrs, err := lu.metadataBackend.All(ctx, sourcePath)
if err != nil {
return err
}
Expand Down
83 changes: 45 additions & 38 deletions pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ package metadata

import (
"context"
"errors"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/google/renameio/v2"
"github.com/pkg/xattr"
"github.com/rogpeppe/go-internal/lockedfile"
"github.com/shamaton/msgpack/v2"
Expand Down Expand Up @@ -142,74 +145,65 @@ func (b MessagePackBackend) saveAttributes(ctx context.Context, path string, set
span.End()
}()

lockPath := b.LockfilePath(path)
metaPath := b.MetadataPath(path)
if acquireLock {
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
f, err = lockedfile.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
} else {
_, subspan := tracer.Start(ctx, "os.OpenFile")
f, err = os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0600)
f, err = lockedfile.OpenFile(lockPath, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
defer f.Close()
}
if err != nil {
return err
}
defer f.Close()

// Invalidate cache early
_, subspan := tracer.Start(ctx, "metaCache.RemoveMetadata")
_ = b.metaCache.RemoveMetadata(b.cacheKey(path))
subspan.End()

// Read current state
_, subspan = tracer.Start(ctx, "io.ReadAll")
_, subspan = tracer.Start(ctx, "os.ReadFile")
var msgBytes []byte
msgBytes, err = io.ReadAll(f)
msgBytes, err = os.ReadFile(metaPath)
subspan.End()
if err != nil {
return err
}
attribs := map[string][]byte{}
if len(msgBytes) > 0 {
switch {
case err != nil:
if !errors.Is(err, fs.ErrNotExist) {
return err
}
case len(msgBytes) == 0:
// ugh. an empty file? bail out
return errors.New("encountered empty metadata file")
default:
// only unmarshal if we read data
err = msgpack.Unmarshal(msgBytes, &attribs)
if err != nil {
return err
}
}

// set new metadata
// prepare metadata
for key, val := range setAttribs {
attribs[key] = val
}
for _, key := range deleteAttribs {
delete(attribs, key)
}

// Truncate file
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return err
}
_, subspan = tracer.Start(ctx, "f.Truncate")
err = f.Truncate(0)
subspan.End()
if err != nil {
return err
}

// Write new metadata to file
var d []byte
d, err = msgpack.Marshal(attribs)
if err != nil {
return err
}
_, subspan = tracer.Start(ctx, "f.Write")
_, err = f.Write(d)
subspan.End()

// overwrite file atomically
_, subspan = tracer.Start(ctx, "renameio.Writefile")
err = renameio.WriteFile(metaPath, d, 0600)
if err != nil {
return err
}
subspan.End()

_, subspan = tracer.Start(ctx, "metaCache.PushToCache")
err = b.metaCache.PushToCache(b.cacheKey(path), attribs)
Expand All @@ -227,9 +221,13 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
}

metaPath := b.MetadataPath(path)
var msgBytes []byte

if source == nil {
_, subspan := tracer.Start(ctx, "lockedfile.Open")
source, err = lockedfile.Open(metaPath)
// // No cached entry found. Read from storage and store in cache
_, subspan := tracer.Start(ctx, "os.OpenFile")
// source, err = lockedfile.Open(metaPath)
source, err = os.Open(metaPath)
subspan.End()
// // No cached entry found. Read from storage and store in cache
if err != nil {
Expand All @@ -246,12 +244,16 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
return attribs, nil // no attributes set yet
}
}
defer source.(*lockedfile.File).Close()
_, subspan = tracer.Start(ctx, "io.ReadAll")
msgBytes, err = io.ReadAll(source)
source.(*os.File).Close()
subspan.End()
} else {
_, subspan := tracer.Start(ctx, "io.ReadAll")
msgBytes, err = io.ReadAll(source)
subspan.End()
}

_, subspan := tracer.Start(ctx, "io.ReadAll")
msgBytes, err := io.ReadAll(source)
subspan.End()
if err != nil {
return nil, err
}
Expand All @@ -262,7 +264,7 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
}
}

_, subspan = tracer.Start(ctx, "metaCache.PushToCache")
_, subspan := tracer.Start(ctx, "metaCache.PushToCache")
err = b.metaCache.PushToCache(b.cacheKey(path), attribs)
subspan.End()
if err != nil {
Expand All @@ -273,7 +275,9 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
}

// IsMetaFile returns whether the given path represents a meta file
func (MessagePackBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".mpk") }
func (MessagePackBackend) IsMetaFile(path string) bool {
return strings.HasSuffix(path, ".mpk") || strings.HasSuffix(path, ".mpk.lock")
}

// Purge purges the data of a given path
func (b MessagePackBackend) Purge(path string) error {
Expand Down Expand Up @@ -304,6 +308,9 @@ func (b MessagePackBackend) Rename(oldPath, newPath string) error {
// MetadataPath returns the path of the file holding the metadata for the given path
func (MessagePackBackend) MetadataPath(path string) string { return path + ".mpk" }

// LockfilePath returns the path of the lock file
func (MessagePackBackend) LockfilePath(path string) string { return path + ".mpk.lock" }

func (b MessagePackBackend) cacheKey(path string) string {
// rootPath is guaranteed to have no trailing slash
// the cache key shouldn't begin with a slash as some stores drop it which can cause
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/utils/decomposedfs/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Backend interface {
Rename(oldPath, newPath string) error
IsMetaFile(path string) bool
MetadataPath(path string) string
LockfilePath(path string) string

AllWithLockedSource(ctx context.Context, path string, source io.Reader) (map[string][]byte, error)
}
Expand Down Expand Up @@ -110,6 +111,9 @@ func (NullBackend) Rename(oldPath, newPath string) error { return errUnconfigure
// MetadataPath returns the path of the file holding the metadata for the given path
func (NullBackend) MetadataPath(path string) string { return "" }

// LockfilePath returns the path of the lock file
func (NullBackend) LockfilePath(path string) string { return "" }

// AllWithLockedSource reads all extended attributes from the given reader
// The path argument is used for storing the data in the cache
func (NullBackend) AllWithLockedSource(ctx context.Context, path string, source io.Reader) (map[string][]byte, error) {
Expand Down
22 changes: 8 additions & 14 deletions pkg/storage/utils/decomposedfs/metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import (

var _ = Describe("Backend", func() {
var (
tmpdir string
file string
metafile string
tmpdir string
file string

backend metadata.Backend
)
Expand All @@ -46,9 +45,6 @@ var _ = Describe("Backend", func() {

JustBeforeEach(func() {
file = path.Join(tmpdir, "file")
metafile = backend.MetadataPath(file)
_, err := os.Create(metafile)
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
Expand Down Expand Up @@ -147,10 +143,9 @@ var _ = Describe("Backend", func() {
Expect(v["bar"]).To(Equal([]byte("baz")))
})

It("returns an empty map", func() {
v, err := backend.All(context.Background(), file)
Expect(err).ToNot(HaveOccurred())
Expect(v).To(Equal(map[string][]byte{}))
It("fails when the metafile does not exist", func() {
_, err := backend.All(context.Background(), file)
Expect(err).To(HaveOccurred())
})
})

Expand All @@ -165,10 +160,9 @@ var _ = Describe("Backend", func() {
Expect(v).To(ConsistOf("foo", "bar"))
})

It("returns an empty list", func() {
v, err := backend.List(context.Background(), file)
Expect(err).ToNot(HaveOccurred())
Expect(v).To(Equal([]string{}))
It("fails when the metafile does not exist", func() {
_, err := backend.List(context.Background(), file)
Expect(err).To(HaveOccurred())
})
})

Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"

"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/pkg/errors"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (XattrsBackend) Remove(ctx context.Context, filePath string, key string) (e
}

// IsMetaFile returns whether the given path represents a meta file
func (XattrsBackend) IsMetaFile(path string) bool { return false }
func (XattrsBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".meta.lock") }

// Purge purges the data of a given path
func (XattrsBackend) Purge(path string) error { return nil }
Expand All @@ -167,6 +168,9 @@ func (XattrsBackend) Rename(oldPath, newPath string) error { return nil }
// MetadataPath returns the path of the file holding the metadata for the given path
func (XattrsBackend) MetadataPath(path string) string { return path }

// LockfilePath returns the path of the lock file
func (XattrsBackend) LockfilePath(path string) string { return path + ".meta.lock" }

func cleanupLockfile(f *lockedfile.File) {
_ = f.Close()
_ = os.Remove(f.Name())
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
np := n.InternalPath()
if items, err := filepath.Glob(np + node.RevisionIDDelimiter + "*"); err == nil {
for i := range items {
if fs.lu.MetadataBackend().IsMetaFile(items[i]) {
if fs.lu.MetadataBackend().IsMetaFile(items[i]) || strings.HasSuffix(items[i], ".lock") {
continue
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
attributeName == prefixes.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to version node")
return errtypes.InternalError("failed to copy blob xattrs to version node: " + err.Error())
}

// remember mtime from node as new revision mtime
Expand All @@ -256,7 +256,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
attributeName == prefixes.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to old revision to node")
return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error())
}

revisionSize, err := fs.lu.MetadataBackend().GetInt64(ctx, restoredRevisionPath, prefixes.BlobsizeAttr)
Expand Down
14 changes: 2 additions & 12 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -750,17 +749,8 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
// lock parent before reading treesize or tree time

_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
var parentFilename string
switch t.lookup.MetadataBackend().(type) {
case metadata.MessagePackBackend:
parentFilename = t.lookup.MetadataBackend().MetadataPath(n.ParentPath())
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
case metadata.XattrsBackend:
// we have to use dedicated lockfiles to lock directories
// this only works because the xattr backend also locks folders with separate lock files
parentFilename = n.ParentPath() + filelocks.LockFileSuffix
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
}
parentFilename := t.lookup.MetadataBackend().LockfilePath(n.ParentPath())
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
if err != nil {
sublog.Error().Err(err).
Expand Down
Loading