Skip to content

Commit

Permalink
Merge pull request #4956 from aduffeck/improve-posixfs-logging
Browse files Browse the repository at this point in the history
Improve posixfs logging
  • Loading branch information
butonic authored Nov 19, 2024
2 parents a4f2d7b + 06123ec commit c9d6a2d
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 65 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/improve-posixfs-logging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Improve posixfs error handling and logging

We improved error handling and logging in the posixfs storage driver.

https://github.com/cs3org/reva/pull/4956
2 changes: 1 addition & 1 deletion pkg/storage/fs/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

trashbin, err := trashbin.New(o, lu)
trashbin, err := trashbin.New(o, lu, log)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/fs/posix/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"

"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"

Expand Down Expand Up @@ -177,12 +178,14 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
},
)

logger := zerolog.New(os.Stderr).With().Logger()

bs := &treemocks.Blobstore{}
tree, err := tree.New(lu, bs, um, &trashbin.Trashbin{}, o, nil, store.Create(), nil)
tree, err := tree.New(lu, bs, um, &trashbin.Trashbin{}, o, nil, store.Create(), &logger)
if err != nil {
return nil, err
}
tb, err := trashbin.New(o, lu)
tb, err := trashbin.New(o, lu, &logger)
if err != nil {
return nil, err
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/fs/posix/trashbin/trashbin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"strings"
"time"

"github.com/google/uuid"
"github.com/rs/zerolog"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage"
Expand All @@ -34,13 +37,13 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
)

type Trashbin struct {
fs storage.FS
o *options.Options
lu *lookup.Lookup
fs storage.FS
o *options.Options
lu *lookup.Lookup
log *zerolog.Logger
}

const (
Expand All @@ -49,10 +52,11 @@ const (
)

// New returns a new Trashbin
func New(o *options.Options, lu *lookup.Lookup) (*Trashbin, error) {
func New(o *options.Options, lu *lookup.Lookup, log *zerolog.Logger) (*Trashbin, error) {
return &Trashbin{
o: o,
lu: lu,
o: o,
lu: lu,
log: log,
}, nil
}

Expand Down Expand Up @@ -261,7 +265,9 @@ func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Refere
if err != nil {
return err
}
_ = tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath)
if err := tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath); err != nil {
tb.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", string(id)).Str("path", restorePath).Msg("trashbin: error caching id")
}

// cleanup trash info
if relativePath == "." || relativePath == "/" {
Expand Down
79 changes: 63 additions & 16 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
// cases:
switch action {
case ActionCreate:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionCreate)")
if !isDir {
// 1. New file (could be emitted as part of a new directory)
// -> assimilate file
Expand Down Expand Up @@ -197,6 +198,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
}

case ActionUpdate:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionUpdate)")
// 3. Updated file
// -> update file unless parent directory is being rescanned
if !t.scanDebouncer.InProgress(filepath.Dir(path)) {
Expand All @@ -207,6 +209,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
}

case ActionMove:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMove)")
// 4. Moved file
// -> update file
// 5. Moved directory
Expand All @@ -218,6 +221,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
})

case ActionMoveFrom:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMoveFrom)")
// 6. file/directory moved out of the watched directory
// -> update directory
if err := t.setDirty(filepath.Dir(path), true); err != nil {
Expand All @@ -227,9 +231,16 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }()

case ActionDelete:
_ = t.HandleFileDelete(path)
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("handling deleted item")

// 7. Deleted file or directory
// -> update parent and all children

err := t.HandleFileDelete(path)
if err != nil {
return err
}

t.scanDebouncer.Debounce(scanItem{
Path: filepath.Dir(path),
ForceRescan: true,
Expand All @@ -242,8 +253,12 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {

func (t *Tree) HandleFileDelete(path string) error {
// purge metadata
_ = t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path)
_ = t.lookup.MetadataBackend().Purge(context.Background(), path)
if err := t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path); err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not delete id cache entry by path")
}
if err := t.lookup.MetadataBackend().Purge(context.Background(), path); err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not purge metadata")
}

// send event
owner, spaceID, nodeID, parentID, err := t.getOwnerAndIDs(filepath.Dir(path))
Expand Down Expand Up @@ -369,31 +384,48 @@ func (t *Tree) assimilate(item scanItem) error {
if ok && len(previousParentID) > 0 && previousPath != item.Path {
_, err := os.Stat(previousPath)
if err == nil {
// this id clashes with an existing id -> clear metadata and re-assimilate
// this id clashes with an existing item -> clear metadata and re-assimilate
t.log.Debug().Str("path", item.Path).Msg("ID clash detected, purging metadata and re-assimilating")

_ = t.lookup.MetadataBackend().Purge(context.Background(), item.Path)
if err := t.lookup.MetadataBackend().Purge(context.Background(), item.Path); err != nil {
t.log.Error().Err(err).Str("path", item.Path).Msg("could not purge metadata")
}
go func() {
_ = t.assimilate(scanItem{Path: item.Path, ForceRescan: true})
if err := t.assimilate(scanItem{Path: item.Path, ForceRescan: true}); err != nil {
t.log.Error().Err(err).Str("path", item.Path).Msg("could not re-assimilate")
}
}()
} else {
// this is a move
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
t.log.Debug().Str("path", item.Path).Msg("move detected")

if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id")
}
_, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
return err
}

// purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair.
_ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath)
_ = t.lookup.MetadataBackend().Purge(context.Background(), previousPath)
if err := t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath); err != nil {
t.log.Error().Err(err).Str("path", previousPath).Msg("could not delete id cache entry by path")
}
if err := t.lookup.MetadataBackend().Purge(context.Background(), previousPath); err != nil {
t.log.Error().Err(err).Str("path", previousPath).Msg("could not purge metadata")
}

fi, err := os.Stat(item.Path)
if err != nil {
return err
}
if fi.IsDir() {
// if it was moved and it is a directory we need to propagate the move
go func() { _ = t.WarmupIDCache(item.Path, false, true) }()
go func() {
if err := t.WarmupIDCache(item.Path, false, true); err != nil {
t.log.Error().Err(err).Str("path", item.Path).Msg("could not warmup id cache")
}
}()
}

parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
Expand Down Expand Up @@ -426,14 +458,18 @@ func (t *Tree) assimilate(item scanItem) error {
}
} else {
// This item had already been assimilated in the past. Update the path
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
t.log.Debug().Str("path", item.Path).Msg("updating cached path")
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id")
}

_, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
return err
}
}
} else {
t.log.Debug().Str("path", item.Path).Msg("new item detected")
// assimilate new file
newId := uuid.New().String()
fi, err := t.updateFile(item.Path, newId, spaceID)
Expand Down Expand Up @@ -550,12 +586,15 @@ assimilate:
return nil, errors.Wrap(err, "failed to propagate")
}

t.log.Debug().Str("path", path).Interface("attributes", attributes).Msg("setting attributes")
err = t.lookup.MetadataBackend().SetMultiple(context.Background(), path, attributes, false)
if err != nil {
return nil, errors.Wrap(err, "failed to set attributes")
}

_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path)
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", path).Msg("could not cache id")
}

return fi, nil
}
Expand Down Expand Up @@ -654,20 +693,28 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
_ = t.assimilate(scanItem{Path: path, ForceRescan: true})
}
}
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path)
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path); err != nil {
t.log.Error().Err(err).Str("spaceID", string(spaceID)).Str("id", string(id)).Str("path", path).Msg("could not cache id")
}
}
} else if assimilate {
_ = t.assimilate(scanItem{Path: path, ForceRescan: true})
if err := t.assimilate(scanItem{Path: path, ForceRescan: true}); err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not assimilate item")
}
}
return t.setDirty(path, false)
})

for dir, size := range sizes {
if dir == root {
// Propagate the size diff further up the tree
_ = t.propagateSizeDiff(dir, size)
if err := t.propagateSizeDiff(dir, size); err != nil {
t.log.Error().Err(err).Str("path", dir).Msg("could not propagate size diff")
}
}
if err := t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size))); err != nil {
t.log.Error().Err(err).Str("path", dir).Int64("size", size).Msg("could not set tree size")
}
_ = t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size)))
}

if err != nil {
Expand Down
40 changes: 26 additions & 14 deletions pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import (
"os"
"strconv"
"time"

"github.com/rs/zerolog"
)

type GpfsFileAuditLoggingWatcher struct {
tree *Tree
log *zerolog.Logger
}

type lwe struct {
Expand All @@ -37,9 +40,10 @@ type lwe struct {
BytesWritten string
}

func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string) (*GpfsFileAuditLoggingWatcher, error) {
func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string, log *zerolog.Logger) (*GpfsFileAuditLoggingWatcher, error) {
w := &GpfsFileAuditLoggingWatcher{
tree: tree,
log: log,
}

_, err := os.Stat(auditLogFile)
Expand Down Expand Up @@ -75,25 +79,33 @@ start:
case nil:
err := json.Unmarshal([]byte(line), ev)
if err != nil {
w.log.Error().Err(err).Str("line", line).Msg("error unmarshalling line")
continue
}
if isLockFile(ev.Path) || isTrash(ev.Path) || w.tree.isUpload(ev.Path) {
continue
}
switch ev.Event {
case "CREATE":
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false) }()
case "CLOSE":
bytesWritten, err := strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false) }()
go func() {
switch ev.Event {
case "CREATE":
err = w.tree.Scan(ev.Path, ActionCreate, false)
case "CLOSE":
var bytesWritten int
bytesWritten, err = strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
err = w.tree.Scan(ev.Path, ActionUpdate, false)
}
case "RENAME":
err = w.tree.Scan(ev.Path, ActionMove, false)
if warmupErr := w.tree.WarmupIDCache(ev.Path, false, false); warmupErr != nil {
w.log.Error().Err(warmupErr).Str("path", ev.Path).Msg("error warming up id cache")
}
}
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, ActionMove, false)
_ = w.tree.WarmupIDCache(ev.Path, false, false)
}()
}
if err != nil {
w.log.Error().Err(err).Str("line", line).Msg("error unmarshalling line")
}
}()

case io.EOF:
time.Sleep(1 * time.Second)
default:
Expand Down
Loading

0 comments on commit c9d6a2d

Please sign in to comment.