Skip to content

Commit

Permalink
Handle file refs better, add startup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 committed Nov 30, 2020
1 parent 94eab34 commit b9ac0b4
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 59 deletions.
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/treeverse/lakefs/pyramid"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -28,6 +30,10 @@ const (
DefaultBlockStoreS3StreamingChunkSize = 2 << 19 // 1MiB by default per chunk
DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first

DefaultDiskAllocatedSize = 1 * 1024 * 1024 * 1024
DefaultDiskBaseDir = "~/lakefs/metadata"
DefaultDiskBlockStoragePrefix = "_lakefs"

DefaultBlockStoreGSS3Endpoint = "https://storage.googleapis.com"

DefaultAuthCacheEnabled = true
Expand Down Expand Up @@ -88,6 +94,10 @@ func setDefaults() {
viper.SetDefault("blockstore.s3.streaming_chunk_timeout", DefaultBlockStoreS3StreamingChunkTimeout)
viper.SetDefault("blockstore.s3.max_retries", DefaultS3MaxRetries)

viper.SetDefault("disk.allocated_size", DefaultDiskAllocatedSize)
viper.SetDefault("disk.base_dir", DefaultDiskBaseDir)
viper.SetDefault("disk.block_storage_prefix", DefaultDiskBlockStoragePrefix)

viper.SetDefault("gateways.s3.domain_name", DefaultS3GatewayDomainName)
viper.SetDefault("gateways.s3.region", DefaultS3GatewayRegion)

Expand All @@ -107,6 +117,14 @@ func (c *Config) GetDatabaseParams() dbparams.Database {
}
}

func (c *Config) GetLocalDiskParams() pyramid.Params {
return pyramid.Params{
AllocatedSize: viper.GetInt64("disk.allocated_size"),
BaseDir: viper.GetString("disk.base_dir"),
BlockStoragePrefix: viper.GetString("disk.block_storage_prefix"),
}
}

func (c *Config) GetCatalogerType() string {
return viper.GetString("cataloger.type")
}
Expand Down
14 changes: 7 additions & 7 deletions pyramid/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type evictionControl struct {
cache *ristretto.Cache
}

func newEvictionControl(capacity, estimatedFileSize int64, evict func(filename string)) (*evictionControl, error) {
func newEvictionControl(capacity, estimatedFileSize int64, evict func(rPath relativePath)) (*evictionControl, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 10 * capacity / estimatedFileSize,
MaxCost: capacity,
Expand All @@ -25,17 +25,17 @@ func newEvictionControl(capacity, estimatedFileSize int64, evict func(filename s
}, nil
}

func onEvict(evict func(localpath string)) func(uint64, uint64, interface{}, int64) {
func onEvict(evict func(rPath relativePath)) func(uint64, uint64, interface{}, int64) {
return func(_, _ uint64, value interface{}, _ int64) {
evict(value.(string))
evict(value.(relativePath))
}
}

// touch updates last access time for the file
func (am *evictionControl) touch(localpath string) {
am.cache.Get(localpath)
func (am *evictionControl) touch(rPath relativePath) {
am.cache.Get(rPath)
}

func (am *evictionControl) store(localpath string, filesize int64) {
am.cache.Set(localpath, localpath, filesize)
func (am *evictionControl) store(rPath relativePath, filesize int64) {
am.cache.Set(rPath, rPath, filesize)
}
6 changes: 3 additions & 3 deletions pyramid/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ type File struct {
fh *os.File
access *evictionControl

localpath string
rPath relativePath

close func(size int64) error
size int64
}

func (f *File) Read(p []byte) (n int, err error) {
f.access.touch(f.localpath)
f.access.touch(f.rPath)
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
f.access.touch(f.localpath)
f.access.touch(f.rPath)
return f.fh.ReadAt(p, off)
}

Expand Down
15 changes: 15 additions & 0 deletions pyramid/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pyramid

// Params is pyramid.FS params that are identical for all file-systems
// in a single lakeFS instance.
type Params struct {
// AllocatedSize is the disk size in bytes that lakeFS is allowed to use.
AllocatedSize int64

// BaseDir is the local directory where lakeFS app is storing the files.
BaseDir string

// BlockStoragePrefix is the prefix prepended to lakeFS metadata files in
// the blockstore.
BlockStoragePrefix string
}
140 changes: 91 additions & 49 deletions pyramid/tierFS.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"
"path"
"path/filepath"

"github.com/treeverse/lakefs/block"
)
Expand All @@ -22,41 +23,62 @@ type ImmutableTierFS struct {
remotePrefix string
}

const (
fsBlockStoragePrefix = "_lakeFS"
type Config struct {
fsName string
adaptor block.Adapter

// TODO: to flags
localBaseDir = "/local/lakeFS"
estimatedFilesize = 10 * 1024 * 1024
)
fsBlockStoragePrefix string
localBaseDir string

allocatedDiskSize int64
estimatedFilesize int64
}

func NewTierFS(fsName string, adaptor block.Adapter, allocatedDiskSize int64) (FS, error) {
fsLocalBaseDir := path.Join(localBaseDir, fsName)
// NewTierFS creates a new TierFS.
// It will traverse the existing local folders and will update
// the local disk cache to reflect existing files.
func NewTierFS(c *Config) (FS, error) {
fsLocalBaseDir := path.Join(c.localBaseDir, c.fsName)
if err := os.MkdirAll(fsLocalBaseDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating base dir: %w", err)
}

// TODO(itai): handle files that exist in dir on start-up:
// 1. discover namespaces
// 2. fill eviction-control with the files

tierFS := &ImmutableTierFS{
adaptor: adaptor,
fsName: fsName,
adaptor: c.adaptor,
fsName: c.fsName,
fsLocalBaseDir: fsLocalBaseDir,
remotePrefix: path.Join(fsBlockStoragePrefix, fsName),
remotePrefix: path.Join(c.fsBlockStoragePrefix, c.fsName),
}
eviction, err := newEvictionControl(allocatedDiskSize, estimatedFilesize, tierFS.removeFromLocal)
eviction, err := newEvictionControl(c.allocatedDiskSize, c.estimatedFilesize, tierFS.removeFromLocal)
if err != nil {
return nil, fmt.Errorf("creating eviction control :%w", err)
}

if err := addExistingFiles(eviction, fsLocalBaseDir); err != nil {
return nil, fmt.Errorf("adding existing files to eviction:%w", err)
}

tierFS.eviction = eviction
return tierFS, nil
}

func (tfs *ImmutableTierFS) removeFromLocal(localPath string) {
_ = os.Remove(localPath)
func addExistingFiles(eviction *evictionControl, dir string) error {
return filepath.Walk(dir, func(rPath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
// nothing to do with dirs
return nil
}

eviction.store(relativePath(rPath), info.Size())
return nil
})
}

func (tfs *ImmutableTierFS) removeFromLocal(rPath relativePath) {
_ = os.Remove(path.Join(tfs.fsLocalBaseDir, string(rPath)))
}

// Store adds the local file to the FS.
Expand All @@ -83,12 +105,12 @@ func (tfs *ImmutableTierFS) Store(namespace, originalPath, filename string) erro
return fmt.Errorf("create namespace dir: %w", err)
}

localpath := tfs.localpath(namespace, filename)
if err := os.Rename(originalPath, localpath); err != nil {
fileRef := tfs.newLocalFileRef(namespace, filename)
if err := os.Rename(originalPath, fileRef.fullPath); err != nil {
return fmt.Errorf("rename file: %w", err)
}

tfs.eviction.store(filename, stat.Size())
tfs.eviction.store(fileRef.fsRelativePath, stat.Size())
return nil
}

Expand All @@ -97,72 +119,73 @@ func (tfs *ImmutableTierFS) Create(namespace, filename string) (*File, error) {
return nil, fmt.Errorf("create namespace dir: %w", err)
}

localpath := tfs.localpath(namespace, filename)
fh, err := os.Create(localpath)
fileRef := tfs.newLocalFileRef(namespace, filename)
fh, err := os.Create(fileRef.fullPath)
if err != nil {
return nil, fmt.Errorf("creating file: %w", err)
}

return &File{
fh: fh,
localpath: localpath,
access: tfs.eviction,
close: tfs.adapterStore(namespace, filename, localpath, fh),
fh: fh,
rPath: fileRef.fsRelativePath,
access: tfs.eviction,
close: tfs.adapterStore(fileRef, fh),
}, nil
}

func (tfs *ImmutableTierFS) adapterStore(namespace string, filename string, localpath string, fh *os.File) func(size int64) error {
func (tfs *ImmutableTierFS) adapterStore(fileRef localFileRef, reader io.Reader) func(size int64) error {
return func(size int64) error {
tfs.eviction.store(localpath, size)
return tfs.adaptor.Put(tfs.objPointer(namespace, filename), size, fh, block.PutOpts{})
tfs.eviction.store(fileRef.fsRelativePath, size)
return tfs.adaptor.Put(tfs.objPointer(fileRef.namespace, fileRef.filename), size, reader, block.PutOpts{})
}
}

// Load returns the a file descriptor to the local file.
// If the file is missing from the local disk, it will try to fetch it from the block storage.
func (tfs *ImmutableTierFS) Open(namespace, filename string) (*File, error) {
localPath := tfs.localpath(namespace, filename)
fh, err := os.Open(localPath)
fileRef := tfs.newLocalFileRef(namespace, filename)
fh, err := os.Open(fileRef.fullPath)
if err == nil {
return tfs.openFile(localPath, fh)
return tfs.openFile(fileRef, fh)
}
if !os.IsNotExist(err) {
return nil, fmt.Errorf("open file: %w", err)
}

fh, err = tfs.readFromBlockStorage(namespace, filename)
fh, err = tfs.readFromBlockStorage(fileRef)
if err != nil {
return nil, err
}

tfs.eviction.touch(localPath)
return tfs.openFile(localPath, fh)
return tfs.openFile(fileRef, fh)
}

func (tfs *ImmutableTierFS) openFile(localPath string, fh *os.File) (*File, error) {
// openFile converts an os.File to pyramid.File and updates the eviction control.
func (tfs *ImmutableTierFS) openFile(fileRef localFileRef, fh *os.File) (*File, error) {
stat, err := fh.Stat()
if err != nil {
return nil, fmt.Errorf("file stat: %w", err)
}

tfs.eviction.store(localPath, stat.Size())

tfs.eviction.store(fileRef.fsRelativePath, stat.Size())
return &File{
fh: fh,
localpath: localPath,
access: tfs.eviction,
fh: fh,
rPath: fileRef.fsRelativePath,
access: tfs.eviction,
}, nil
}

func (tfs *ImmutableTierFS) readFromBlockStorage(namespace, filename string) (*os.File, error) {
reader, err := tfs.adaptor.Get(tfs.objPointer(namespace, filename), 0)
// readFromBlockStorage reads the referenced file from the block storage
// and places it in the local FS for further reading.
// It returns a file handle to the local file.
func (tfs *ImmutableTierFS) readFromBlockStorage(fileRef localFileRef) (*os.File, error) {
reader, err := tfs.adaptor.Get(tfs.objPointer(fileRef.namespace, fileRef.filename), 0)
if err != nil {
return nil, fmt.Errorf("read from block storage: %w", err)
}
defer reader.Close()

localPath := tfs.localpath(namespace, filename)
writer, err := os.Create(localPath)
writer, err := os.Create(fileRef.fullPath)
if err != nil {
return nil, fmt.Errorf("creating file: %w", err)
}
Expand All @@ -174,15 +197,34 @@ func (tfs *ImmutableTierFS) readFromBlockStorage(namespace, filename string) (*o
return nil, fmt.Errorf("writer close: %w", err)
}

fh, err := os.Open(localPath)
fh, err := os.Open(fileRef.fullPath)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
return fh, nil
}

func (tfs *ImmutableTierFS) localpath(namespace, filename string) string {
return path.Join(tfs.fsLocalBaseDir, namespace, filename)
// relativePath is the path of the file under TierFS
type relativePath string

// localFileRef consists of all possible local file references
type localFileRef struct {
namespace string
filename string

fullPath string
fsRelativePath relativePath
}

func (tfs *ImmutableTierFS) newLocalFileRef(namespace, filename string) localFileRef {
relative := path.Join(namespace, filename)
return localFileRef{
namespace: namespace,
filename: filename,

fsRelativePath: relativePath(relative),
fullPath: path.Join(tfs.fsLocalBaseDir, relative),
}
}

func (tfs *ImmutableTierFS) blockStoragePath(filename string) string {
Expand Down

0 comments on commit b9ac0b4

Please sign in to comment.