-
Notifications
You must be signed in to change notification settings - Fork 361
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
First version of the immutable tiered storage
- Loading branch information
Showing
5 changed files
with
205 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package pyramid | ||
|
||
import "os" | ||
|
||
type File struct { | ||
fh *os.File | ||
access func() | ||
release func() | ||
} | ||
|
||
func (f *File) Read(p []byte) (n int, err error) { | ||
f.access() | ||
return f.fh.Read(p) | ||
} | ||
|
||
func (f *File) ReadAt(p []byte, off int64) (n int, err error) { | ||
f.access() | ||
return f.fh.ReadAt(p, off) | ||
} | ||
|
||
func (f *File) Write(p []byte) (n int, err error) { | ||
f.access() | ||
return f.fh.Write(p) | ||
} | ||
|
||
func (f *File) Stat() (os.FileInfo, error) { | ||
return f.fh.Stat() | ||
} | ||
|
||
func (f *File) Sync() error { | ||
return f.fh.Sync() | ||
} | ||
|
||
func (f *File) Close() error { | ||
f.release() | ||
return f.Close() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package pyramid | ||
|
||
// StorageType is enumeration of the different tiered storage types | ||
// supported by pyramid. | ||
type StorageType int | ||
|
||
const ( | ||
StorageTypeSSTable StorageType = iota | ||
StorageTypeTreeManifest | ||
) | ||
|
||
// FS is pyramid abstraction of filesystem where the persistent storage-layer is the block storage. | ||
// Files on the Local disk are transient and might be cleaned up by the eviction policy. | ||
type FS interface { | ||
// Store uploads the file to the block storage. | ||
// It may remove the file from the local storage. | ||
Store(t StorageType, filename string) error | ||
|
||
// Open finds the referenced file and returns the file descriptor. | ||
// If file isn't in the local disk, it is fetched from the block storage. | ||
Open(t StorageType, filename string) (*File,error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package pyramid | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"os" | ||
"path" | ||
"time" | ||
|
||
"github.com/treeverse/lakefs/block" | ||
) | ||
|
||
type storageProps struct { | ||
t StorageType | ||
localDir string | ||
blockStoragePrefix string | ||
} | ||
|
||
// ImmutableTierFS is a filesystem where written files are never edited. | ||
// All files are stored in the block storage. Local paths are treated as a | ||
// cache layer that will be evicted according to the given eviction algorithm. | ||
type ImmutableTierFS struct { | ||
adaptor block.Adapter | ||
config map[StorageType]storageProps | ||
|
||
// TODO: use refs anc last-access for the eviction algorithm | ||
refCount map[StorageType]map[string]int | ||
lastAccess map[StorageType]map[string]time.Time | ||
} | ||
|
||
// mapping between supported storage types and their prefix | ||
var types = map[StorageType]string{ | ||
StorageTypeSSTable: "sstables", | ||
StorageTypeTreeManifest: "trees", | ||
} | ||
|
||
func NewTierFS(adaptor block.Adapter, localdir, blockStoragePrefix string) *ImmutableTierFS { | ||
refCount := map[StorageType]map[string]int{} | ||
lastAccess := map[StorageType]map[string]time.Time{} | ||
config := map[StorageType]storageProps{} | ||
for t, prefix := range types { | ||
refCount[t] = map[string]int{} | ||
lastAccess[t] = map[string]time.Time{} | ||
config[t] = storageProps{ | ||
t: t, | ||
localDir: path.Join(localdir, prefix), | ||
blockStoragePrefix: path.Join(blockStoragePrefix, prefix), | ||
} | ||
} | ||
|
||
return &ImmutableTierFS{ | ||
adaptor: adaptor, | ||
refCount: refCount, | ||
lastAccess: lastAccess, | ||
config: config, | ||
} | ||
} | ||
|
||
// Store uploads the local file to the block storage for persistence. | ||
func (tfs *ImmutableTierFS) Store(t StorageType, ns, filename string) error { | ||
localpath := tfs.localpath(t, filename) | ||
f, err := os.Open(localpath) | ||
if err != nil { | ||
return fmt.Errorf("open file: %w", err) | ||
} | ||
defer f.Close() | ||
|
||
stat, err := f.Stat() | ||
if err != nil { | ||
return fmt.Errorf("file stat: %w", err) | ||
} | ||
|
||
return tfs.adaptor.Put(tfs.objPointer(t, ns, filename), stat.Size(), f, block.PutOpts{}) | ||
} | ||
|
||
// Load returns the a file descriptor to the local file. | ||
// If the file is missing from the local disk, it will try to fetch it from the block storage. | ||
func (tfs *ImmutableTierFS) Load(t StorageType, ns, filename string) (*File, error) { | ||
localPath := tfs.localpath(t, filename) | ||
fh, err := os.Open(localPath) | ||
if err != nil { | ||
if os.IsNotExist(err) { | ||
fh, err = tfs.readFromBlockStorage(t, ns, filename) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
return nil, fmt.Errorf("open file: %w", err) | ||
} | ||
} | ||
|
||
// TODO: refs thread-safe | ||
tfs.refCount[t][filename] = tfs.refCount[t][filename] + 1 | ||
return &File{ | ||
fh: fh, | ||
access: func() { | ||
tfs.lastAccess[t][filename] = time.Now() | ||
}, | ||
release: func() { | ||
tfs.refCount[t][filename] = tfs.refCount[t][filename] - 1 | ||
}, | ||
}, nil | ||
} | ||
|
||
func (tfs *ImmutableTierFS) readFromBlockStorage(t StorageType, ns string, filename string) (*os.File, error) { | ||
reader, err := tfs.adaptor.Get(tfs.objPointer(t, ns, filename), 0) | ||
if err != nil { | ||
return nil, fmt.Errorf("read from block storage: %w", err) | ||
} | ||
defer reader.Close() | ||
|
||
localPath := tfs.localpath(t, filename) | ||
writer, err := os.Create(localPath) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating file: %w", err) | ||
} | ||
defer writer.Close() | ||
|
||
if _, err := io.Copy(writer, reader); err != nil { | ||
return nil, fmt.Errorf("copying date to file: %w", err) | ||
} | ||
|
||
fh, err := os.Open(localPath) | ||
if err != nil { | ||
return nil, fmt.Errorf("open file: %w", err) | ||
} | ||
return fh, nil | ||
} | ||
|
||
func (tfs *ImmutableTierFS) localpath(t StorageType, filename string) string { | ||
return path.Join(tfs.config[t].localDir, filename) | ||
} | ||
|
||
func (tfs *ImmutableTierFS) blockStoragePath(t StorageType, filename string) string { | ||
return path.Join(tfs.config[t].blockStoragePrefix, filename) | ||
} | ||
|
||
func (tfs *ImmutableTierFS) objPointer(t StorageType, ns string, filename string) block.ObjectPointer { | ||
return block.ObjectPointer{ | ||
StorageNamespace: ns, | ||
Identifier: tfs.blockStoragePath(t, filename), | ||
} | ||
} |