Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Added FS abstraction, rework of State tests #12592

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions erigon-lib/common/customfs/customfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
package customfs

import (
"errors"
"github.com/spf13/afero"
"io"
"io/fs"
"os"
"slices"
"strconv"
"strings"
"sync/atomic"
"syscall"
)

var CFS = CustomFileSystem{afero.NewOsFs(), atomic.Int32{}}

type CustomFileSystem struct {
afero.Fs
TmpCounter atomic.Int32
}

type IoWrapper struct {
CustomFileSystem
}

func (fs *IoWrapper) Open(name string) (fs.File, error) {
return fs.Fs.Open(name)
}

type CustomFile struct {
afero.File
}

func (fs *CustomFileSystem) IsNotExist(err error) bool {
return underlyingErrorIs(err, afero.ErrFileNotFound)
}

func underlyingErrorIs(err, target error) bool {
// Note that this function is not errors.Is:
// underlyingError only unwraps the specific error-wrapping types
// that it historically did, not all errors implementing Unwrap().
err = underlyingError(err)
if err == target { //nolint
return true
}
// To preserve prior behavior, only examine syscall errors.
e, ok := err.(syscallErrorType) //nolint
return ok && e.Is(target)
}

// underlyingError returns the underlying error for known os error types.
func underlyingError(err error) error {
switch err := err.(type) { //nolint
case *os.PathError:
return err.Err
case *os.LinkError:
return err.Err
case *os.SyscallError:
return err.Err
}
return err
}

type syscallErrorType = syscall.Errno

func (fs *CustomFileSystem) Create(name string) (*CustomFile, error) {
file, err := fs.Fs.Create(name)
return &CustomFile{file}, err
}

func (f *CustomFile) ReadFrom(r io.Reader) (n int64, err error) {
if err := f.checkValid("write"); err != nil {
return 0, err
}
return genericReadFrom(f, r) // without wrapping
}

func genericReadFrom(f *CustomFile, r io.Reader) (int64, error) {
return io.Copy(fileWithoutReadFrom{CustomFile: f}, r)
}

// noReadFrom can be embedded alongside another type to
// hide the ReadFrom method of that other type.
type noReadFrom struct{} //nolint

// ReadFrom hides another ReadFrom method.
// It should never be called.
func (noReadFrom) ReadFrom(io.Reader) (int64, error) { //nolint
panic("can't happen")
}

// fileWithoutReadFrom implements all the methods of *File other
// than ReadFrom. This is used to permit ReadFrom to call io.Copy
// without leading to a recursive call to ReadFrom.
type fileWithoutReadFrom struct {
noReadFrom //nolint
*CustomFile
}

func (f *CustomFile) checkValid(op string) error {
if f == nil {
return os.ErrInvalid
}
return nil
}

// MkdirTemp creates a new temporary directory in the directory dir
// and returns the pathname of the new directory.
// The new directory's name is generated by adding a random string to the end of pattern.
// If pattern includes a "*", the random string replaces the last "*" instead.
// The directory is created with mode 0o700 (before umask).
// If dir is the empty string, MkdirTemp uses the default directory for temporary files, as returned by TempDir.
// Multiple programs or goroutines calling MkdirTemp simultaneously will not choose the same directory.
// It is the caller's responsibility to remove the directory when it is no longer needed.
func (fs *CustomFileSystem) MkdirTemp(dir, pattern string) (string, error) {
if _, ok := fs.Fs.(*afero.OsFs); ok {
return os.MkdirTemp(dir, pattern)
}
if dir == "" {
dir = "tmp"
}

prefix, suffix, err := prefixAndSuffix(pattern)
if err != nil {
return "", &os.PathError{Op: "mkdirtemp", Path: pattern, Err: err}
}
prefix = joinPath(dir, prefix)

try := 0
for {
tmpCounter := fs.TmpCounter.Add(1) - 1
name := prefix + strconv.Itoa(int(tmpCounter)) + suffix
err := fs.Mkdir(name, 0700)
if err == nil {
return name, nil
}
if fs.IsExist(err) {
if try++; try < 10000 {
continue
}
return "", &os.PathError{Op: "mkdirtemp", Path: dir, Err: os.ErrExist}
}
if fs.IsNotExist(err) {
if _, err := fs.Stat(dir); fs.IsNotExist(err) {
return "", err
}
}
return "", err
}
}

func (fs *CustomFileSystem) CreateTemp(dir, pattern string) (*CustomFile, error) {
if _, ok := fs.Fs.(*afero.OsFs); ok {
file, err := os.CreateTemp(dir, pattern)
return &CustomFile{file}, err
}
if dir == "" {
dir = "tmp"
}

prefix, suffix, err := prefixAndSuffix(pattern)
if err != nil {
return nil, &os.PathError{Op: "createtemp", Path: pattern, Err: err}
}
prefix = joinPath(dir, prefix)

try := 0
for {
tmpCounter := fs.TmpCounter.Add(1) - 1
name := prefix + strconv.Itoa(int(tmpCounter)) + suffix
f, err := fs.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
if fs.IsExist(err) {
if try++; try < 10000 {
continue
}
return nil, &os.PathError{Op: "createtemp", Path: prefix + "*" + suffix, Err: os.ErrExist}
}
return &CustomFile{f}, err
}
}

func (fs *CustomFileSystem) IsExist(err error) bool {
return underlyingErrorIs(err, os.ErrExist)
}

// prefixAndSuffix splits pattern by the last wildcard "*", if applicable,
// returning prefix as the part before "*" and suffix as the part after "*".
func prefixAndSuffix(pattern string) (prefix, suffix string, err error) {
for i := 0; i < len(pattern); i++ {
if os.IsPathSeparator(pattern[i]) {
return "", "", errors.New("separator")
}
}
if pos := strings.LastIndexByte(pattern, '*'); pos != -1 {
prefix, suffix = pattern[:pos], pattern[pos+1:]
} else {
prefix = pattern
}
return prefix, suffix, nil
}

func joinPath(dir, name string) string {
if len(dir) > 0 && os.IsPathSeparator(dir[len(dir)-1]) {
return dir + name
}
return dir + string(os.PathSeparator) + name
}

func (fs *CustomFileSystem) ReadDir(name string) ([]os.FileInfo, error) {
f, err := fs.Open(name)
if err != nil {
return nil, err
}
defer f.Close()

dirs, err := f.Readdir(-1)
slices.SortFunc(dirs, func(a, b os.FileInfo) int {
return strings.Compare(a.Name(), b.Name())
})
return dirs, err
}

// ReadFile reads the named file and returns the contents.
// A successful call returns err == nil, not err == EOF.
// Because ReadFile reads the whole file, it does not treat an EOF from Read
// as an error to be reported.
func (fs *CustomFileSystem) ReadFile(name string) ([]byte, error) {
f, err := fs.Open(name)
if err != nil {
return nil, err
}
defer f.Close()

var size int
if info, err := f.Stat(); err == nil {
size64 := info.Size()
if int64(int(size64)) == size64 {
size = int(size64)
}
}
size++ // one byte for final read at EOF

// If a file claims a small size, read at least 512 bytes.
// In particular, files in Linux's /proc claim size 0 but
// then do not work right if read in small pieces,
// so an initial read of 1 byte would not work correctly.
if size < 512 {
size = 512
}

data := make([]byte, 0, size)
for {
n, err := f.Read(data[len(data):cap(data)])
data = data[:len(data)+n]
if err != nil {
if err == io.EOF { //nolint
err = nil
}
return data, err
}

if len(data) >= cap(data) {
d := append(data[:cap(data)], 0)
data = d[:len(data)]
}
}
}

func (fs *CustomFileSystem) WriteFile(name string, data []byte, perm os.FileMode) error {
f, err := fs.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}

func (fs *CustomFileSystem) DirFS(dir string) fs.FS {
if _, ok := fs.Fs.(*afero.OsFs); ok {
return os.DirFS(dir)
}
return &IoWrapper{CustomFileSystem{afero.NewBasePathFs(fs.Fs, dir), atomic.Int32{}}}
}
15 changes: 7 additions & 8 deletions erigon-lib/common/datadir/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package datadir
import (
"errors"
"fmt"
"os"
"github.com/erigontech/erigon-lib/common/customfs"
"github.com/gofrs/flock"
"path/filepath"
"syscall"

"github.com/gofrs/flock"

"github.com/erigontech/erigon-lib/common/dir"
)

Expand Down Expand Up @@ -163,7 +162,7 @@ func downloaderV2Migration(dirs Dirs) error {
return nil
}
from, to := filepath.Join(dirs.Snap, "db", "mdbx.dat"), filepath.Join(dirs.Downloader, "mdbx.dat")
if err := os.Rename(from, to); err != nil {
if err := customfs.CFS.Rename(from, to); err != nil {
//fall back to copy-file if folders are on different disks
if err := CopyFile(from, to); err != nil {
return err
Expand All @@ -173,24 +172,24 @@ func downloaderV2Migration(dirs Dirs) error {
}

func CopyFile(from, to string) error {
r, err := os.Open(from)
r, err := customfs.CFS.Open(from)
if err != nil {
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
defer r.Close()
w, err := os.Create(to)
w, err := customfs.CFS.Create(to)
if err != nil {
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
defer w.Close()
if _, err = w.ReadFrom(r); err != nil {
w.Close()
os.Remove(to)
customfs.CFS.Remove(to)
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
if err = w.Sync(); err != nil {
w.Close()
os.Remove(to)
customfs.CFS.Remove(to)
return fmt.Errorf("please manually move file: from %s to %s. error: %w", from, to, err)
}
return nil
Expand Down
Loading
Loading