Skip to content

Commit

Permalink
First version of the immutable tiered storage
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 committed Nov 26, 2020
1 parent 4c888fa commit b353aae
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/golang-migrate/migrate/v4 v4.12.2
github.com/golang/protobuf v1.4.2
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect
github.com/golangci/golangci-lint v1.30.0
github.com/google/uuid v1.1.1
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws=
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 h1:23T5iq8rbUYlhpt5DB4XJkc6BU31uODLD1o1gKvZmD0=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4=
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a h1:w8hkcTqaFpzKqonE9uMCefW1WDie15eSP/4MssdenaM=
Expand Down
45 changes: 45 additions & 0 deletions pyramid/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pyramid

import "os"

type File struct {
fh *os.File
access func()
release func()
close func(size int64) error
size int64
}

func (f *File) Read(p []byte) (n int, err error) {
f.access()
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
f.access()
return f.fh.ReadAt(p, off)
}

func (f *File) Write(p []byte) (n int, err error) {
f.access()
s, err := f.fh.Write(p)
f.size += int64(s)
return s, err
}

func (f *File) Stat() (os.FileInfo, error) {
return f.fh.Stat()
}

func (f *File) Sync() error {
return f.fh.Sync()
}

func (f *File) Close() error {
f.release()
if err := f.close(f.size); err != nil {
return err
}

return f.fh.Close()
}
5 changes: 5 additions & 0 deletions pyramid/layer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package pyramid

// Layer is a storage layer that is part of a pyramid FS
type Layer interface {
}
17 changes: 17 additions & 0 deletions pyramid/pyramid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pyramid

// FS is pyramid abstraction of filesystem where the persistent storage-layer is the block storage.
// Files on the local disk are transient and might be cleaned up by the eviction policy.
type FS interface {
// Store adds the file from the filepath to the FS. It uploads the file to the
// block-storage and to the localpath.
Store(filepath, filename string) error

// Create creates a new file in the FS.
// It will only be persistent after the returned file is closed.
Create(filename string) (*File, error)

// Open finds the referenced file and returns the file descriptor.
// If file isn't in the local disk, it is fetched from the block storage.
Open(filename string) (*File, error)
}
37 changes: 37 additions & 0 deletions pyramid/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pyramid

import (
"fmt"
"os"
"path"
)

type SharedLocalStorage struct {
filesystems map[string]FS
basepath string
}

func NewSharedLocalStorage(baseFolderPath string) (*SharedLocalStorage, error) {
if err := os.MkdirAll(baseFolderPath, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating base dir: %w", err)
}

return &SharedLocalStorage{
filesystems: map[string]FS{},
basepath: baseFolderPath,
}, nil
}

func (sd *SharedLocalStorage) Register(fsName string, fs FS) (string, error) {
if _, ok := sd.filesystems[fsName]; ok {
return "", fmt.Errorf("file system %s already registered", fsName)
}

dirPath := path.Join(sd.basepath, fsName)
if err := os.Mkdir(dirPath, os.ModePerm); err != nil {
return "", fmt.Errorf("creating fs dir: %w", err)
}

sd.filesystems[fsName] = fs
return dirPath, nil
}
173 changes: 173 additions & 0 deletions pyramid/tierFS.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package pyramid

import (
"fmt"
"io"
"os"
"path"
"time"

"github.com/treeverse/lakefs/block"
)

//type storageProps struct {
// t StorageType
// localDir string
// blockStoragePrefix string
//}

// ImmutableTierFS is a filesystem where written files are never edited.
// All files are stored in the block storage. Local paths are treated as a
// cache layer that will be evicted according to the given eviction algorithm.
type ImmutableTierFS struct {
adaptor block.Adapter
localStorage *SharedLocalStorage

// TODO: use refs anc last-access for the eviction algorithm
refCount map[string]int
lastAccess map[string]time.Time

fsName string
adapterNS string

localBaseDir string
remotePrefix string
}

const fsBlockStoragePrefix = "_lakeFS"

//// mapping between supported storage types and their prefix
//var types = map[StorageType]string{
// StorageTypeSSTable: "sstables",
// StorageTypeTreeManifest: "trees",
//}

func NewTierFS(adaptor block.Adapter, localStorage *SharedLocalStorage, fsName, adapterNS string) *ImmutableTierFS {
fs := &ImmutableTierFS{
adaptor: adaptor,
refCount: map[string]int{},
lastAccess: map[string]time.Time{},
fsName: fsName,
adapterNS: adapterNS,
remotePrefix: path.Join(fsBlockStoragePrefix, fsName),
}
fsDir, err := localStorage.Register(fsName, fs)
if err != nil {
panic(err)
}
fs.localBaseDir = fsDir

return fs
}

// Store adds the local file to the FS.
func (tfs *ImmutableTierFS) Store(originalPath, filename string) error {
f, err := os.Open(originalPath)
if err != nil {
return fmt.Errorf("open file: %w", err)
}

stat, err := f.Stat()
if err != nil {
return fmt.Errorf("file stat: %w", err)
}

if err := tfs.adaptor.Put(tfs.objPointer(filename), stat.Size(), f, block.PutOpts{}); err != nil {
return fmt.Errorf("adapter put: %w", err)
}

if err := f.Close(); err != nil {
return fmt.Errorf("closing file: %w", err)
}

localpath := tfs.localpath(filename)
if err := os.Rename(originalPath, localpath); err != nil {
return fmt.Errorf("rename file: %w", err)
}
}

func (tfs *ImmutableTierFS) Create(filename string) (*File, error) {
localpath := tfs.localpath(filename)
fh, err := os.Create(localpath)
if err != nil {
return nil, fmt.Errorf("creating file: %w", err)
}

return &File{
fh: fh,
access: func() {},
release: func() {},
close: func(size int64) error {
return tfs.adaptor.Put(tfs.objPointer(filename), size, fh, block.PutOpts{})
},
}, nil
}

// Load returns the a file descriptor to the local file.
// If the file is missing from the local disk, it will try to fetch it from the block storage.
func (tfs *ImmutableTierFS) Open(filename string) (*File, error) {
localPath := tfs.localpath(filename)
fh, err := os.Open(localPath)
if err != nil {
if os.IsNotExist(err) {
fh, err = tfs.readFromBlockStorage(filename)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("open file: %w", err)
}
}

// TODO: refs thread-safe
tfs.refCount[filename] = tfs.refCount[filename] + 1
return &File{
fh: fh,
access: func() {
tfs.lastAccess[filename] = time.Now()
},
release: func() {
tfs.refCount[filename] = tfs.refCount[filename] - 1
},
}, nil
}

func (tfs *ImmutableTierFS) readFromBlockStorage(filename string) (*os.File, error) {
reader, err := tfs.adaptor.Get(tfs.objPointer(filename), 0)
if err != nil {
return nil, fmt.Errorf("read from block storage: %w", err)
}
defer reader.Close()

localPath := tfs.localpath(filename)
writer, err := os.Create(localPath)
if err != nil {
return nil, fmt.Errorf("creating file: %w", err)
}
defer writer.Close()

if _, err := io.Copy(writer, reader); err != nil {
return nil, fmt.Errorf("copying date to file: %w", err)
}

fh, err := os.Open(localPath)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
return fh, nil
}

func (tfs *ImmutableTierFS) localpath(filename string) string {
return path.Join(tfs.localBaseDir, filename)
}

func (tfs *ImmutableTierFS) blockStoragePath(filename string) string {
return path.Join(tfs.remotePrefix, filename)
}

func (tfs *ImmutableTierFS) objPointer(filename string) block.ObjectPointer {
return block.ObjectPointer{
StorageNamespace: tfs.adapterNS,
Identifier: tfs.blockStoragePath(filename),
}
}

0 comments on commit b353aae

Please sign in to comment.