Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 committed Dec 1, 2020
1 parent 121283c commit 77ed7cb
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 67 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
DefaultBlockStoreS3StreamingChunkSize = 2 << 19 // 1MiB by default per chunk
DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first

DefaultDiskAllocatedSize = 1 * 1024 * 1024 * 1024
DefaultDiskAllocatedBytes = 1 * 1024 * 1024 * 1024
DefaultDiskBaseDir = "~/lakefs/metadata"
DefaultDiskBlockStoragePrefix = "_lakefs"

Expand Down Expand Up @@ -94,7 +94,7 @@ func setDefaults() {
viper.SetDefault("blockstore.s3.streaming_chunk_timeout", DefaultBlockStoreS3StreamingChunkTimeout)
viper.SetDefault("blockstore.s3.max_retries", DefaultS3MaxRetries)

viper.SetDefault("disk.allocated_size", DefaultDiskAllocatedSize)
viper.SetDefault("disk.allocated_bytes", DefaultDiskAllocatedBytes)
viper.SetDefault("disk.base_dir", DefaultDiskBaseDir)
viper.SetDefault("disk.block_storage_prefix", DefaultDiskBlockStoragePrefix)

Expand All @@ -119,7 +119,7 @@ func (c *Config) GetDatabaseParams() dbparams.Database {

func (c *Config) GetLocalDiskParams() pyramid.Params {
return pyramid.Params{
AllocatedSize: viper.GetInt64("disk.allocated_size"),
AllocatedBytes: viper.GetInt64("disk.allocated_bytes"),
BaseDir: viper.GetString("disk.base_dir"),
BlockStoragePrefix: viper.GetString("disk.block_storage_prefix"),
}
Expand Down
23 changes: 16 additions & 7 deletions pyramid/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,31 @@ import (
"github.com/dgraph-io/ristretto"
)

type evictionControl struct {
// evictionControl is in charge of limiting the size of local files in TierFS.
type evictionControl interface {
// touch updates the last time the path was read.
touch(rPath relativePath)

// store updates the eviction control with the path and the file size.
store(rPath relativePath, filesize int64)
}

type lruSizeEviction struct {
cache *ristretto.Cache
}

func newEvictionControl(capacity, estimatedFileSize int64, evict func(rPath relativePath)) (*evictionControl, error) {
func newLRUSizeEviction(capacity, estimatedFileBytes int64, evict func(rPath relativePath)) (*lruSizeEviction, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 10 * capacity / estimatedFileSize,
// Per ristretto, this is the optimized counters num
NumCounters: int64(10.0 * float64(capacity) / float64(estimatedFileBytes)),
MaxCost: capacity,
Metrics: false,
OnEvict: onEvict(evict),
})
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
return &evictionControl{
return &lruSizeEviction{
cache: cache,
}, nil
}
Expand All @@ -31,11 +41,10 @@ func onEvict(evict func(rPath relativePath)) func(uint64, uint64, interface{}, i
}
}

// touch updates last access time for the file
func (am *evictionControl) touch(rPath relativePath) {
func (am *lruSizeEviction) touch(rPath relativePath) {
am.cache.Get(rPath)
}

func (am *evictionControl) store(rPath relativePath, filesize int64) {
func (am *lruSizeEviction) store(rPath relativePath, filesize int64) {
am.cache.Set(rPath, rPath, filesize)
}
46 changes: 30 additions & 16 deletions pyramid/file.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,45 @@
package pyramid

import "os"
import (
"errors"
"os"
)

// File is pyramid wrapper for os.file that triggers pyramid hooks for file actions.
type File struct {
fh *os.File
access *evictionControl
fh *os.File
eviction *lruSizeEviction

rPath relativePath
readOnly bool
rPath relativePath

close func(size int64) error
size int64
store func() error
}

var ErrReadOnlyFile = errors.New("file is read-only")

func (f *File) Read(p []byte) (n int, err error) {
f.access.touch(f.rPath)
if !f.readOnly {
// file is being written, eviction policies don't apply to it
f.eviction.touch(f.rPath)
}
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
f.access.touch(f.rPath)
if !f.readOnly {
// file is being written, eviction policies don't apply to it
f.eviction.touch(f.rPath)
}
return f.fh.ReadAt(p, off)
}

func (f *File) Write(p []byte) (n int, err error) {
s, err := f.fh.Write(p)
f.size += int64(s)
return s, err
if f.readOnly {
return 0, ErrReadOnlyFile
}

return f.fh.Write(p)
}

func (f *File) Stat() (os.FileInfo, error) {
Expand All @@ -38,11 +51,12 @@ func (f *File) Sync() error {
}

func (f *File) Close() error {
if f.close != nil {
if err := f.close(f.size); err != nil {
return err
}
if err := f.fh.Close(); err != nil {
return err
}

return f.fh.Close()
if f.store == nil {
return nil
}
return f.store()
}
4 changes: 2 additions & 2 deletions pyramid/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package pyramid
// Params is pyramid.FS params that are identical for all file-systems
// in a single lakeFS instance.
type Params struct {
// AllocatedSize is the disk size in bytes that lakeFS is allowed to use.
AllocatedSize int64
// AllocatedBytes is the disk size in bytes that lakeFS is allowed to use.
AllocatedBytes int64

// BaseDir is the local directory where lakeFS app is storing the files.
BaseDir string
Expand Down
123 changes: 84 additions & 39 deletions pyramid/tierFS.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,43 @@ import (

// ImmutableTierFS is a filesystem where written files are never edited.
// All files are stored in the block storage. Local paths are treated as a
// cache layer that will be evicted according to the given eviction algorithm.
// cache layer that will be evicted according to the eviction control.
type ImmutableTierFS struct {
adaptor block.Adapter
eviction *evictionControl
eviction *lruSizeEviction

fsName string

fsLocalBaseDir string
remotePrefix string

remotePrefix string
}

type Config struct {
fsName string
// fsName is the unique filesystem name for this TierFS instance.
// If two TierFS instances have the same name, behaviour is undefined.
fsName string

adaptor block.Adapter

// Prefix for all metadata file lakeFS stores in the block storage.
fsBlockStoragePrefix string
localBaseDir string

allocatedDiskSize int64
estimatedFilesize int64
// The directory where TierFS files are kept locally.
localBaseDir string

// Maximum number of bytes an instance of TierFS can allocate to local files.
// This is not a hard limit - there might be short period of times where TierFS
// uses more disk due to ongoing writes and slow disk cleanups.
allocatedDiskBytes int64

// The average estimated file size in bytes.
// Useful for minimizing memory consumption of the files in-mem cache.
estimatedFileBytes int64
}

const workspaceDir = "workspace"

// NewTierFS creates a new TierFS.
// It will traverse the existing local folders and will update
// the local disk cache to reflect existing files.
Expand All @@ -49,40 +64,64 @@ func NewTierFS(c *Config) (FS, error) {
fsLocalBaseDir: fsLocalBaseDir,
remotePrefix: path.Join(c.fsBlockStoragePrefix, c.fsName),
}
eviction, err := newEvictionControl(c.allocatedDiskSize, c.estimatedFilesize, tierFS.removeFromLocal)
eviction, err := newLRUSizeEviction(c.allocatedDiskBytes, c.estimatedFileBytes, tierFS.removeFromLocal)
if err != nil {
return nil, fmt.Errorf("creating eviction control :%w", err)
}

if err := addExistingFiles(eviction, fsLocalBaseDir); err != nil {
return nil, fmt.Errorf("adding existing files to eviction:%w", err)
if err := handleExistingFiles(eviction, fsLocalBaseDir); err != nil {
return nil, fmt.Errorf("handling existing files: %w", err)
}

tierFS.eviction = eviction
return tierFS, nil
}

func addExistingFiles(eviction *evictionControl, dir string) error {
return filepath.Walk(dir, func(rPath string, info os.FileInfo, err error) error {
// handleExistingFiles should only be called during init of the TierFS.
// It does 2 things:
// 1. Adds stored files to the eviction control
// 2. Remove workspace directories and all its content if it
// exist under the namespace dir.
func handleExistingFiles(eviction *lruSizeEviction, dir string) error {
var workspaceDirs []string
if err := filepath.Walk(dir, func(rPath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
// nothing to do with dirs
if info.Name() == workspaceDir {
// skipping workspaces and saving them for later delete
workspaceDirs = append(workspaceDirs, rPath)
}
return nil
}

eviction.store(relativePath(rPath), info.Size())
return nil
})
}); err != nil {
return fmt.Errorf("walking the fs dir: %w", err)
}

for _, dir := range workspaceDirs {
if err := os.RemoveAll(dir); err != nil {
return fmt.Errorf("removing dir: %w", err)
}
}

return nil
}

func (tfs *ImmutableTierFS) removeFromLocal(rPath relativePath) {
_ = os.Remove(path.Join(tfs.fsLocalBaseDir, string(rPath)))
}

// Store adds the local file to the FS.
// on successful operation, file will no longer be available under the originalPath.
func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) error {
return tfs.store(namespace, originalPath, filename)
}

func (tfs *ImmutableTierFS) store(namespace, originalPath, filename string) error {
f, err := os.Open(originalPath)
if err != nil {
return fmt.Errorf("open file: %w", err)
Expand All @@ -101,7 +140,7 @@ func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) erro
return fmt.Errorf("closing file: %w", err)
}

if err := tfs.createNSDir(namespace); err != nil {
if err := tfs.createNSWorkspaceDir(namespace); err != nil {
return fmt.Errorf("create namespace dir: %w", err)
}

Expand All @@ -114,32 +153,30 @@ func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) erro
return nil
}

// Create creates a new file in TierFS.
// File isn't stored in TierFS until a successful close operation.
// Open(namespace, filename) calls will return an error before the close was called.
func (tfs *ImmutableTierFS) Create(namespace, filename string) (*File, error) {
if err := tfs.createNSDir(namespace); err != nil {
if err := tfs.createNSWorkspaceDir(namespace); err != nil {
return nil, fmt.Errorf("create namespace dir: %w", err)
}

fileRef := tfs.newLocalFileRef(namespace, filename)
fh, err := os.Create(fileRef.fullPath)
tempPath := tfs.workspaceFilePath(namespace, filename)
fh, err := os.Create(tempPath)
if err != nil {
return nil, fmt.Errorf("creating file: %w", err)
}

return &File{
fh: fh,
rPath: fileRef.fsRelativePath,
access: tfs.eviction,
close: tfs.adapterStore(fileRef, fh),
fh: fh,
readOnly: false,
eviction: tfs.eviction,
store: func() error {
return tfs.store(namespace, tempPath, filename)
},
}, nil
}

func (tfs *ImmutableTierFS) adapterStore(fileRef localFileRef, reader io.Reader) func(size int64) error {
return func(size int64) error {
tfs.eviction.store(fileRef.fsRelativePath, size)
return tfs.adaptor.Put(tfs.objPointer(fileRef.namespace, fileRef.filename), size, reader, block.PutOpts{})
}
}

// Load returns the a file descriptor to the local file.
// If the file is missing from the local disk, it will try to fetch it from the block storage.
func (tfs *ImmutableTierFS) Open(namespace, filename string) (*File, error) {
Expand Down Expand Up @@ -169,9 +206,10 @@ func (tfs *ImmutableTierFS) openFile(fileRef localFileRef, fh *os.File) (*File,

tfs.eviction.store(fileRef.fsRelativePath, stat.Size())
return &File{
fh: fh,
rPath: fileRef.fsRelativePath,
access: tfs.eviction,
fh: fh,
readOnly: true,
rPath: fileRef.fsRelativePath,
eviction: tfs.eviction,
}, nil
}

Expand Down Expand Up @@ -227,18 +265,25 @@ func (tfs *ImmutableTierFS) newLocalFileRef(namespace, filename string) localFil
}
}

func (tfs *ImmutableTierFS) blockStoragePath(filename string) string {
return path.Join(tfs.remotePrefix, filename)
}

func (tfs *ImmutableTierFS) objPointer(namespace, filename string) block.ObjectPointer {
return block.ObjectPointer{
StorageNamespace: namespace,
Identifier: tfs.blockStoragePath(filename),
}
}

func (tfs *ImmutableTierFS) createNSDir(namespace string) error {
dir := path.Join(tfs.fsLocalBaseDir, namespace)
return os.MkdirAll(dir, os.ModePerm)
func (tfs *ImmutableTierFS) blockStoragePath(filename string) string {
return path.Join(tfs.remotePrefix, filename)
}

func (tfs *ImmutableTierFS) createNSWorkspaceDir(namespace string) error {
return os.MkdirAll(tfs.workspaceDirPath(namespace), os.ModePerm)
}

func (tfs *ImmutableTierFS) workspaceDirPath(namespace string) string {
return path.Join(tfs.fsLocalBaseDir, namespace, workspaceDir)
}

func (tfs *ImmutableTierFS) workspaceFilePath(namespace string, filename string) string {
return path.Join(tfs.workspaceDirPath(namespace), filename)
}

0 comments on commit 77ed7cb

Please sign in to comment.