diff --git a/config/config.go b/config/config.go index 842d8c26fd0..9b24efd5043 100644 --- a/config/config.go +++ b/config/config.go @@ -30,7 +30,7 @@ const ( DefaultBlockStoreS3StreamingChunkSize = 2 << 19 // 1MiB by default per chunk DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first - DefaultDiskAllocatedSize = 1 * 1024 * 1024 * 1024 + DefaultDiskAllocatedBytes = 1 * 1024 * 1024 * 1024 DefaultDiskBaseDir = "~/lakefs/metadata" DefaultDiskBlockStoragePrefix = "_lakefs" @@ -94,7 +94,7 @@ func setDefaults() { viper.SetDefault("blockstore.s3.streaming_chunk_timeout", DefaultBlockStoreS3StreamingChunkTimeout) viper.SetDefault("blockstore.s3.max_retries", DefaultS3MaxRetries) - viper.SetDefault("disk.allocated_size", DefaultDiskAllocatedSize) + viper.SetDefault("disk.allocated_bytes", DefaultDiskAllocatedBytes) viper.SetDefault("disk.base_dir", DefaultDiskBaseDir) viper.SetDefault("disk.block_storage_prefix", DefaultDiskBlockStoragePrefix) @@ -119,7 +119,7 @@ func (c *Config) GetDatabaseParams() dbparams.Database { func (c *Config) GetLocalDiskParams() pyramid.Params { return pyramid.Params{ - AllocatedSize: viper.GetInt64("disk.allocated_size"), + AllocatedBytes: viper.GetInt64("disk.allocated_bytes"), BaseDir: viper.GetString("disk.base_dir"), BlockStoragePrefix: viper.GetString("disk.block_storage_prefix"), } diff --git a/pyramid/eviction.go b/pyramid/eviction.go index c94f6a477a1..a5ca5995c3a 100644 --- a/pyramid/eviction.go +++ b/pyramid/eviction.go @@ -6,13 +6,23 @@ import ( "github.com/dgraph-io/ristretto" ) -type evictionControl struct { +// evictionControl is in charge of limiting the size of local files in TierFS. +type evictionControl interface { + // touch updates the last time the path was read. + touch(rPath relativePath) + + // store updates the eviction control with the path and the file size. + store(rPath relativePath, filesize int64) +} + +type lruSizeEviction struct { cache *ristretto.Cache } -func newEvictionControl(capacity, estimatedFileSize int64, evict func(rPath relativePath)) (*evictionControl, error) { +func newLRUSizeEviction(capacity, estimatedFileBytes int64, evict func(rPath relativePath)) (*lruSizeEviction, error) { cache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: 10 * capacity / estimatedFileSize, + // Per ristretto, this is the optimized counters num + NumCounters: int64(10.0 * float64(capacity) / float64(estimatedFileBytes)), MaxCost: capacity, Metrics: false, OnEvict: onEvict(evict), @@ -20,7 +30,7 @@ func newEvictionControl(capacity, estimatedFileSize int64, evict func(rPath rela if err != nil { return nil, fmt.Errorf("creating cache: %w", err) } - return &evictionControl{ + return &lruSizeEviction{ cache: cache, }, nil } @@ -31,11 +41,10 @@ func onEvict(evict func(rPath relativePath)) func(uint64, uint64, interface{}, i } } -// touch updates last access time for the file -func (am *evictionControl) touch(rPath relativePath) { +func (am *lruSizeEviction) touch(rPath relativePath) { am.cache.Get(rPath) } -func (am *evictionControl) store(rPath relativePath, filesize int64) { +func (am *lruSizeEviction) store(rPath relativePath, filesize int64) { am.cache.Set(rPath, rPath, filesize) } diff --git a/pyramid/file.go b/pyramid/file.go index e04796e81a8..6512becfa95 100644 --- a/pyramid/file.go +++ b/pyramid/file.go @@ -1,32 +1,45 @@ package pyramid -import "os" +import ( + "errors" + "os" +) // File is pyramid wrapper for os.file that triggers pyramid hooks for file actions. type File struct { - fh *os.File - access *evictionControl + fh *os.File + eviction *lruSizeEviction - rPath relativePath + readOnly bool + rPath relativePath - close func(size int64) error - size int64 + store func() error } +var ErrReadOnlyFile = errors.New("file is read-only") + func (f *File) Read(p []byte) (n int, err error) { - f.access.touch(f.rPath) + if !f.readOnly { + // file is being written, eviction policies don't apply to it + f.eviction.touch(f.rPath) + } return f.fh.Read(p) } func (f *File) ReadAt(p []byte, off int64) (n int, err error) { - f.access.touch(f.rPath) + if !f.readOnly { + // file is being written, eviction policies don't apply to it + f.eviction.touch(f.rPath) + } return f.fh.ReadAt(p, off) } func (f *File) Write(p []byte) (n int, err error) { - s, err := f.fh.Write(p) - f.size += int64(s) - return s, err + if f.readOnly { + return 0, ErrReadOnlyFile + } + + return f.fh.Write(p) } func (f *File) Stat() (os.FileInfo, error) { @@ -38,11 +51,12 @@ func (f *File) Sync() error { } func (f *File) Close() error { - if f.close != nil { - if err := f.close(f.size); err != nil { - return err - } + if err := f.fh.Close(); err != nil { + return err } - return f.fh.Close() + if f.store == nil { + return nil + } + return f.store() } diff --git a/pyramid/params.go b/pyramid/params.go index 97b1cc14f5b..e0306692e23 100644 --- a/pyramid/params.go +++ b/pyramid/params.go @@ -3,8 +3,8 @@ package pyramid // Params is pyramid.FS params that are identical for all file-systems // in a single lakeFS instance. type Params struct { - // AllocatedSize is the disk size in bytes that lakeFS is allowed to use. - AllocatedSize int64 + // AllocatedBytes is the disk size in bytes that lakeFS is allowed to use. + AllocatedBytes int64 // BaseDir is the local directory where lakeFS app is storing the files. BaseDir string diff --git a/pyramid/tierFS.go b/pyramid/tierFS.go index e436d978d9a..44e0e3a6528 100644 --- a/pyramid/tierFS.go +++ b/pyramid/tierFS.go @@ -12,28 +12,43 @@ import ( // ImmutableTierFS is a filesystem where written files are never edited. // All files are stored in the block storage. Local paths are treated as a -// cache layer that will be evicted according to the given eviction algorithm. +// cache layer that will be evicted according to the eviction control. type ImmutableTierFS struct { adaptor block.Adapter - eviction *evictionControl + eviction *lruSizeEviction fsName string fsLocalBaseDir string - remotePrefix string + + remotePrefix string } type Config struct { - fsName string + // fsName is the unique filesystem name for this TierFS instance. + // If two TierFS instances have the same name, behaviour is undefined. + fsName string + adaptor block.Adapter + // Prefix for all metadata file lakeFS stores in the block storage. fsBlockStoragePrefix string - localBaseDir string - allocatedDiskSize int64 - estimatedFilesize int64 + // The directory where TierFS files are kept locally. + localBaseDir string + + // Maximum number of bytes an instance of TierFS can allocate to local files. + // This is not a hard limit - there might be short period of times where TierFS + // uses more disk due to ongoing writes and slow disk cleanups. + allocatedDiskBytes int64 + + // The average estimated file size in bytes. + // Useful for minimizing memory consumption of the files in-mem cache. + estimatedFileBytes int64 } +const workspaceDir = "workspace" + // NewTierFS creates a new TierFS. // It will traverse the existing local folders and will update // the local disk cache to reflect existing files. @@ -49,32 +64,51 @@ func NewTierFS(c *Config) (FS, error) { fsLocalBaseDir: fsLocalBaseDir, remotePrefix: path.Join(c.fsBlockStoragePrefix, c.fsName), } - eviction, err := newEvictionControl(c.allocatedDiskSize, c.estimatedFilesize, tierFS.removeFromLocal) + eviction, err := newLRUSizeEviction(c.allocatedDiskBytes, c.estimatedFileBytes, tierFS.removeFromLocal) if err != nil { return nil, fmt.Errorf("creating eviction control :%w", err) } - if err := addExistingFiles(eviction, fsLocalBaseDir); err != nil { - return nil, fmt.Errorf("adding existing files to eviction:%w", err) + if err := handleExistingFiles(eviction, fsLocalBaseDir); err != nil { + return nil, fmt.Errorf("handling existing files: %w", err) } tierFS.eviction = eviction return tierFS, nil } -func addExistingFiles(eviction *evictionControl, dir string) error { - return filepath.Walk(dir, func(rPath string, info os.FileInfo, err error) error { +// handleExistingFiles should only be called during init of the TierFS. +// It does 2 things: +// 1. Adds stored files to the eviction control +// 2. Remove workspace directories and all its content if it +// exist under the namespace dir. +func handleExistingFiles(eviction *lruSizeEviction, dir string) error { + var workspaceDirs []string + if err := filepath.Walk(dir, func(rPath string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { - // nothing to do with dirs + if info.Name() == workspaceDir { + // skipping workspaces and saving them for later delete + workspaceDirs = append(workspaceDirs, rPath) + } return nil } eviction.store(relativePath(rPath), info.Size()) return nil - }) + }); err != nil { + return fmt.Errorf("walking the fs dir: %w", err) + } + + for _, dir := range workspaceDirs { + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("removing dir: %w", err) + } + } + + return nil } func (tfs *ImmutableTierFS) removeFromLocal(rPath relativePath) { @@ -82,7 +116,12 @@ func (tfs *ImmutableTierFS) removeFromLocal(rPath relativePath) { } // Store adds the local file to the FS. +// on successful operation, file will no longer be available under the originalPath. func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) error { + return tfs.store(namespace, originalPath, filename) +} + +func (tfs *ImmutableTierFS) store(namespace, originalPath, filename string) error { f, err := os.Open(originalPath) if err != nil { return fmt.Errorf("open file: %w", err) @@ -101,7 +140,7 @@ func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) erro return fmt.Errorf("closing file: %w", err) } - if err := tfs.createNSDir(namespace); err != nil { + if err := tfs.createNSWorkspaceDir(namespace); err != nil { return fmt.Errorf("create namespace dir: %w", err) } @@ -114,32 +153,30 @@ func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) erro return nil } +// Create creates a new file in TierFS. +// File isn't stored in TierFS until a successful close operation. +// Open(namespace, filename) calls will return an error before the close was called. func (tfs *ImmutableTierFS) Create(namespace, filename string) (*File, error) { - if err := tfs.createNSDir(namespace); err != nil { + if err := tfs.createNSWorkspaceDir(namespace); err != nil { return nil, fmt.Errorf("create namespace dir: %w", err) } - fileRef := tfs.newLocalFileRef(namespace, filename) - fh, err := os.Create(fileRef.fullPath) + tempPath := tfs.workspaceFilePath(namespace, filename) + fh, err := os.Create(tempPath) if err != nil { return nil, fmt.Errorf("creating file: %w", err) } return &File{ - fh: fh, - rPath: fileRef.fsRelativePath, - access: tfs.eviction, - close: tfs.adapterStore(fileRef, fh), + fh: fh, + readOnly: false, + eviction: tfs.eviction, + store: func() error { + return tfs.store(namespace, tempPath, filename) + }, }, nil } -func (tfs *ImmutableTierFS) adapterStore(fileRef localFileRef, reader io.Reader) func(size int64) error { - return func(size int64) error { - tfs.eviction.store(fileRef.fsRelativePath, size) - return tfs.adaptor.Put(tfs.objPointer(fileRef.namespace, fileRef.filename), size, reader, block.PutOpts{}) - } -} - // Load returns the a file descriptor to the local file. // If the file is missing from the local disk, it will try to fetch it from the block storage. func (tfs *ImmutableTierFS) Open(namespace, filename string) (*File, error) { @@ -169,9 +206,10 @@ func (tfs *ImmutableTierFS) openFile(fileRef localFileRef, fh *os.File) (*File, tfs.eviction.store(fileRef.fsRelativePath, stat.Size()) return &File{ - fh: fh, - rPath: fileRef.fsRelativePath, - access: tfs.eviction, + fh: fh, + readOnly: true, + rPath: fileRef.fsRelativePath, + eviction: tfs.eviction, }, nil } @@ -227,10 +265,6 @@ func (tfs *ImmutableTierFS) newLocalFileRef(namespace, filename string) localFil } } -func (tfs *ImmutableTierFS) blockStoragePath(filename string) string { - return path.Join(tfs.remotePrefix, filename) -} - func (tfs *ImmutableTierFS) objPointer(namespace, filename string) block.ObjectPointer { return block.ObjectPointer{ StorageNamespace: namespace, @@ -238,7 +272,18 @@ func (tfs *ImmutableTierFS) objPointer(namespace, filename string) block.ObjectP } } -func (tfs *ImmutableTierFS) createNSDir(namespace string) error { - dir := path.Join(tfs.fsLocalBaseDir, namespace) - return os.MkdirAll(dir, os.ModePerm) +func (tfs *ImmutableTierFS) blockStoragePath(filename string) string { + return path.Join(tfs.remotePrefix, filename) +} + +func (tfs *ImmutableTierFS) createNSWorkspaceDir(namespace string) error { + return os.MkdirAll(tfs.workspaceDirPath(namespace), os.ModePerm) +} + +func (tfs *ImmutableTierFS) workspaceDirPath(namespace string) string { + return path.Join(tfs.fsLocalBaseDir, namespace, workspaceDir) +} + +func (tfs *ImmutableTierFS) workspaceFilePath(namespace string, filename string) string { + return path.Join(tfs.workspaceDirPath(namespace), filename) }