diff --git a/config/config.go b/config/config.go index 4d8d3640a1b..32d63a6e034 100644 --- a/config/config.go +++ b/config/config.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/treeverse/lakefs/pyramid" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" @@ -28,6 +30,10 @@ const ( DefaultBlockStoreS3StreamingChunkSize = 2 << 19 // 1MiB by default per chunk DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first + DefaultDiskAllocatedBytes = 1 * 1024 * 1024 * 1024 + DefaultDiskBaseDir = "~/lakefs/local_tier" + DefaultDiskBlockStoragePrefix = "_lakefs" + DefaultBlockStoreGSS3Endpoint = "https://storage.googleapis.com" DefaultAuthCacheEnabled = true @@ -88,6 +94,10 @@ func setDefaults() { viper.SetDefault("blockstore.s3.streaming_chunk_timeout", DefaultBlockStoreS3StreamingChunkTimeout) viper.SetDefault("blockstore.s3.max_retries", DefaultS3MaxRetries) + viper.SetDefault("disk.allocated_bytes", DefaultDiskAllocatedBytes) + viper.SetDefault("disk.base_dir", DefaultDiskBaseDir) + viper.SetDefault("disk.block_storage_prefix", DefaultDiskBlockStoragePrefix) + viper.SetDefault("gateways.s3.domain_name", DefaultS3GatewayDomainName) viper.SetDefault("gateways.s3.region", DefaultS3GatewayRegion) @@ -107,6 +117,14 @@ func (c *Config) GetDatabaseParams() dbparams.Database { } } +func (c *Config) GetLocalDiskParams() pyramid.Params { + return pyramid.Params{ + AllocatedBytes: viper.GetInt64("disk.allocated_bytes"), + BaseDir: viper.GetString("disk.base_dir"), + BlockStoragePrefix: viper.GetString("disk.block_storage_prefix"), + } +} + func (c *Config) GetCatalogerType() string { return viper.GetString("cataloger.type") } diff --git a/go.mod b/go.mod index dae8c309f9b..8ccf9bb0928 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/gofrs/uuid v3.3.0+incompatible // indirect github.com/golang-migrate/migrate/v4 v4.12.2 github.com/golang/protobuf v1.4.2 + github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect github.com/golangci/golangci-lint v1.30.0 github.com/google/uuid v1.1.1 github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect @@ -72,6 +73,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/thanhpk/randstr v1.0.4 github.com/tidwall/pretty v1.0.1 // indirect + github.com/treeverse/golang-lru v0.6.1 github.com/tsenart/vegeta/v12 v12.8.3 github.com/vbauerster/mpb/v5 v5.3.0 github.com/xitongsys/parquet-go v1.5.2 diff --git a/go.sum b/go.sum index 6cd3623f37b..655ddba213f 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,7 @@ github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+q github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= @@ -449,6 +450,8 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws= +github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 h1:23T5iq8rbUYlhpt5DB4XJkc6BU31uODLD1o1gKvZmD0= github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4= github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a h1:w8hkcTqaFpzKqonE9uMCefW1WDie15eSP/4MssdenaM= @@ -600,7 +603,6 @@ github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= -github.com/jackc/pgconn v1.7.2 h1:195tt17jkjy+FrFlY0pgyrul5kRLb7BGXY3JTrNxeXU= github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6 h1:geJ1mgTGd0WQo67wEd+H4OjFG5uA2e3cEBz9D5+pftU= github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= @@ -1025,6 +1027,7 @@ github.com/sonatard/noctx v0.0.1/go.mod h1:9D2D/EoULe8Yy2joDHJj7bv3sZoq9AaSb8B4l github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/go-diff v0.5.3 h1:lhIKJ2nXLZZ+AfbHpYxTn0pXpNTTui0DX7DO3xeb1Zs= github.com/sourcegraph/go-diff v0.5.3/go.mod h1:v9JDtjCE4HHHCZGId75rg8gkKKa98RVjBcBGsVmMmak= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= @@ -1096,6 +1099,8 @@ github.com/tommy-muehle/go-mnd v1.3.1-0.20200224220436-e6f9a994e8fa h1:RC4maTWLK github.com/tommy-muehle/go-mnd v1.3.1-0.20200224220436-e6f9a994e8fa/go.mod h1:dSUh0FtTP8VhvkL1S+gUR1OKd9ZnSaozuI6r3m6wOig= github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9rrstGQ= github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM= +github.com/treeverse/golang-lru v0.6.1 h1:G7nAh8gqSChNEwRpC4MopsIhKvE5cJVQLgKsZhr6cmg= +github.com/treeverse/golang-lru v0.6.1/go.mod h1:q66C/bXLqkppmXQPBfsdbuzj1ELfQeOc4/SMS3SmDrc= github.com/tsenart/go-tsz v0.0.0-20180814232043-cdeb9e1e981e/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo= github.com/tsenart/vegeta/v12 v12.8.3 h1:UEsDkSrEJojMKW/xr7KUv4H/bYykX+V48KCsPZPqEfk= github.com/tsenart/vegeta/v12 v12.8.3/go.mod h1:ZiJtwLn/9M4fTPdMY7bdbIeyNeFVE8/AHbWFqCsUuho= diff --git a/pyramid/eviction.go b/pyramid/eviction.go new file mode 100644 index 00000000000..d395f4a6dc6 --- /dev/null +++ b/pyramid/eviction.go @@ -0,0 +1,39 @@ +package pyramid + +import ( + "fmt" + + lru "github.com/treeverse/golang-lru" + "github.com/treeverse/golang-lru/simplelru" +) + +// eviction is an abstraction of the eviction control for easy testing +type eviction interface { + touch(rPath relativePath) + store(rPath relativePath, filesize int64) int +} + +type lruSizeEviction struct { + cache simplelru.LRUCache +} + +func newLRUSizeEviction(capacity int64, evict func(rPath relativePath)) (eviction, error) { + cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, _ int64) { + evict(key.(relativePath)) + }) + if err != nil { + return nil, fmt.Errorf("creating cache: %w", err) + } + return &lruSizeEviction{ + cache: cache, + }, nil +} + +func (am *lruSizeEviction) touch(rPath relativePath) { + // update last access time, value is meaningless + am.cache.Get(rPath) +} + +func (am *lruSizeEviction) store(rPath relativePath, filesize int64) int { + return am.cache.Add(rPath, nil, filesize) +} diff --git a/pyramid/file.go b/pyramid/file.go new file mode 100644 index 00000000000..70e70f672af --- /dev/null +++ b/pyramid/file.go @@ -0,0 +1,65 @@ +package pyramid + +import ( + "fmt" + "os" +) + +// File is pyramid wrapper for os.file that triggers pyramid hooks for file actions. +type File struct { + fh *os.File + + closed bool + persisted bool + store func(string) error +} + +func (f *File) Read(p []byte) (n int, err error) { + return f.fh.Read(p) +} + +func (f *File) ReadAt(p []byte, off int64) (n int, err error) { + return f.fh.ReadAt(p, off) +} + +func (f *File) Write(p []byte) (n int, err error) { + return f.fh.Write(p) +} + +func (f *File) Stat() (os.FileInfo, error) { + return f.fh.Stat() +} + +func (f *File) Sync() error { + return f.fh.Sync() +} + +func (f *File) Close() error { + f.closed = true + return f.fh.Close() +} + +var ( + errAlreadyPersisted = fmt.Errorf("file is already persisted") + errFileNotClosed = fmt.Errorf("file isn't closed") +) + +// Store copies the closed file to all tiers of the pyramid. +func (f *File) Store(filename string) error { + if err := validateFilename(filename); err != nil { + return err + } + + if f.persisted { + return errAlreadyPersisted + } + if !f.closed { + return errFileNotClosed + } + + err := f.store(filename) + if err == nil { + f.persisted = true + } + return err +} diff --git a/pyramid/file_test.go b/pyramid/file_test.go new file mode 100644 index 00000000000..036dd994d48 --- /dev/null +++ b/pyramid/file_test.go @@ -0,0 +1,134 @@ +package pyramid + +import ( + "io/ioutil" + "os" + "path" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/google/uuid" +) + +func TestPyramidWriteFile(t *testing.T) { + filename := uuid.New().String() + + fh, err := ioutil.TempFile("", filename) + if err != nil { + panic(err) + } + + filepath := fh.Name() + defer os.Remove(filepath) + + storeCalled := false + sut := File{ + fh: fh, + store: func(string) error { + storeCalled = true + return nil + }, + } + + content := "some content to write to file" + n, err := sut.Write([]byte(content)) + require.Equal(t, len(content), n) + require.NoError(t, err) + require.NoError(t, sut.Sync()) + + _, err = sut.Stat() + require.NoError(t, err) + + require.NoError(t, sut.Close()) + require.NoError(t, sut.Store(filename)) + + require.True(t, storeCalled) +} + +func TestWriteValidate(t *testing.T) { + filename := uuid.New().String() + fh, err := ioutil.TempFile("", filename) + if err != nil { + panic(err) + } + + filepath := fh.Name() + defer os.Remove(filepath) + + storeCalled := false + + sut := File{ + fh: fh, + store: func(string) error { + storeCalled = true + return nil + }, + } + + content := "some content to write to file" + n, err := sut.Write([]byte(content)) + require.Equal(t, len(content), n) + require.NoError(t, err) + + require.NoError(t, sut.Close()) + require.Error(t, sut.Store("workspace"+string(os.PathSeparator))) + require.False(t, storeCalled) + + require.Error(t, sut.Close()) + require.NoError(t, sut.Store("validfilename")) + require.Error(t, sut.Store("validfilename")) +} + +func TestPyramidReadFile(t *testing.T) { + filename := uuid.New().String() + filepath := path.Join("/tmp", filename) + content := "some content to write to file" + if err := ioutil.WriteFile(filepath, []byte(content), os.ModePerm); err != nil { + panic(err) + } + defer os.Remove(filepath) + + mockEv := newMockEviction() + + fh, err := os.Open(filepath) + if err != nil { + panic(err) + } + + sut := ROFile{ + fh: fh, + eviction: mockEv, + rPath: relativePath(filename), + } + + _, err = sut.Stat() + require.NoError(t, err) + + bytes := make([]byte, len(content)) + n, err := sut.Read(bytes) + require.NoError(t, err) + require.Equal(t, len(content), n) + require.Equal(t, content, string(bytes)) + require.NoError(t, sut.Close()) + + require.Equal(t, 2, mockEv.touchedTimes[relativePath(filename)]) +} + +type mockEviction struct { + touchedTimes map[relativePath]int +} + +func newMockEviction() *mockEviction { + return &mockEviction{ + touchedTimes: map[relativePath]int{}, + } +} + +func (me *mockEviction) touch(rPath relativePath) { + me.touchedTimes[rPath]++ +} + +func (me *mockEviction) store(_ relativePath, _ int64) int { + return 0 +} diff --git a/pyramid/params.go b/pyramid/params.go new file mode 100644 index 00000000000..e0306692e23 --- /dev/null +++ b/pyramid/params.go @@ -0,0 +1,15 @@ +package pyramid + +// Params is pyramid.FS params that are identical for all file-systems +// in a single lakeFS instance. +type Params struct { + // AllocatedBytes is the disk size in bytes that lakeFS is allowed to use. + AllocatedBytes int64 + + // BaseDir is the local directory where lakeFS app is storing the files. + BaseDir string + + // BlockStoragePrefix is the prefix prepended to lakeFS metadata files in + // the blockstore. + BlockStoragePrefix string +} diff --git a/pyramid/pyramid.go b/pyramid/pyramid.go new file mode 100644 index 00000000000..528b6e043bc --- /dev/null +++ b/pyramid/pyramid.go @@ -0,0 +1,14 @@ +package pyramid + +// FS is pyramid abstraction of filesystem where the persistent storage-layer is the block storage. +// Files on the local disk are transient and might be cleaned up by the eviction policy. +// File structure under a namespace and namespace itself are flat (no directories). +type FS interface { + // Create creates a new file in the FS. + // It will only be persistent after the returned file is stored. + Create(namespace string) (*File, error) + + // Open finds the referenced file and returns its read-only File. + // If file isn't in the local disk, it is fetched from the block storage. + Open(namespace, filename string) (*ROFile, error) +} diff --git a/pyramid/ro_file.go b/pyramid/ro_file.go new file mode 100644 index 00000000000..268efaeac2c --- /dev/null +++ b/pyramid/ro_file.go @@ -0,0 +1,33 @@ +package pyramid + +import ( + "os" +) + +// ROFile is pyramid wrapper for os.file that implements io.ReadCloser +// with hooks for updating evictions. +type ROFile struct { + fh *os.File + eviction eviction + + rPath relativePath +} + +func (f *ROFile) Read(p []byte) (n int, err error) { + f.eviction.touch(f.rPath) + return f.fh.Read(p) +} + +func (f *ROFile) ReadAt(p []byte, off int64) (n int, err error) { + f.eviction.touch(f.rPath) + return f.fh.ReadAt(p, off) +} + +func (f *ROFile) Stat() (os.FileInfo, error) { + f.eviction.touch(f.rPath) + return f.fh.Stat() +} + +func (f *ROFile) Close() error { + return f.fh.Close() +} diff --git a/pyramid/tierFS.go b/pyramid/tierFS.go new file mode 100644 index 00000000000..fb1d8f39e00 --- /dev/null +++ b/pyramid/tierFS.go @@ -0,0 +1,364 @@ +package pyramid + +import ( + "errors" + "fmt" + "io" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/treeverse/lakefs/logging" + + "github.com/google/uuid" + + "github.com/treeverse/lakefs/block" +) + +// TierFS is a filesystem where written files are never edited. +// All files are stored in the block storage. Local paths are treated as a +// cache layer that will be evicted according to the eviction control. +type TierFS struct { + adaptor block.Adapter + eviction eviction + logger logging.Logger + + fsName string + + fsLocalBaseDir string + + remotePrefix string +} + +type Config struct { + // fsName is the unique filesystem name for this TierFS instance. + // If two TierFS instances have the same name, behaviour is undefined. + fsName string + + adaptor block.Adapter + logger logging.Logger + + // Prefix for all metadata file lakeFS stores in the block storage. + fsBlockStoragePrefix string + + // The directory where TierFS files are kept locally. + localBaseDir string + + // Maximum number of bytes an instance of TierFS can allocate to local files. + // This is not a hard limit - there might be short period of times where TierFS + // uses more disk due to ongoing writes and slow disk cleanups. + allocatedDiskBytes int64 +} + +const workspaceDir = "workspace" + +// NewFS creates a new TierFS. +// It will traverse the existing local folders and will update +// the local disk cache to reflect existing files. +func NewFS(c *Config) (FS, error) { + fsLocalBaseDir := path.Join(c.localBaseDir, c.fsName) + if err := os.MkdirAll(fsLocalBaseDir, os.ModePerm); err != nil { + return nil, fmt.Errorf("creating base dir: %w", err) + } + + tierFS := &TierFS{ + adaptor: c.adaptor, + fsName: c.fsName, + logger: c.logger, + fsLocalBaseDir: fsLocalBaseDir, + remotePrefix: path.Join(c.fsBlockStoragePrefix, c.fsName), + } + eviction, err := newLRUSizeEviction(c.allocatedDiskBytes, tierFS.removeFromLocal) + if err != nil { + return nil, fmt.Errorf("creating eviction control: %w", err) + } + + if err := handleExistingFiles(eviction, fsLocalBaseDir); err != nil { + return nil, fmt.Errorf("handling existing files: %w", err) + } + + tierFS.eviction = eviction + return tierFS, nil +} + +// handleExistingFiles should only be called during init of the TierFS. +// It does 2 things: +// 1. Adds stored files to the eviction control +// 2. Remove workspace directories and all its content if it +// exist under the namespace dir. +func handleExistingFiles(eviction eviction, fsLocalBaseDir string) error { + if err := filepath.Walk(fsLocalBaseDir, func(rPath string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + if info.Name() == workspaceDir { + // skipping workspaces and saving them for later delete + if err := os.RemoveAll(rPath); err != nil { + return fmt.Errorf("removing dir: %w", err) + } + return filepath.SkipDir + } + return nil + } + + eviction.store(relativePath(rPath), info.Size()) + return nil + }); err != nil { + return fmt.Errorf("walking the fs dir: %w", err) + } + + return nil +} + +func (tfs *TierFS) removeFromLocal(rPath relativePath) { + // This will be called by the cache eviction mechanism during entry insert. + // We don't want to wait while the file is being removed from the local disk. + go removeFromLocal(tfs.logger, tfs.fsLocalBaseDir, rPath) +} + +func removeFromLocal(logger logging.Logger, fsLocalBaseDir string, rPath relativePath) { + p := path.Join(fsLocalBaseDir, string(rPath)) + if err := os.Remove(p); err != nil { + logger.WithError(err).WithField("path", p).Error("Removing file failed") + return + } + + dir := path.Dir(p) + empty, err := isDirEmpty(dir) + if err != nil { + logger.WithError(err).WithField("dir", dir).Error("Checking if dir empty failed") + return + } + if !empty { + return + } + + if err := os.Remove(dir); err != nil { + logger.WithError(err).WithField("dir", dir).Error("Removing dir failed") + } +} + +func isDirEmpty(name string) (bool, error) { + f, err := os.Open(name) + if err != nil { + return false, err + } + defer f.Close() + + _, err = f.Readdirnames(1) + if err == io.EOF { + return true, nil + } + return false, err +} + +func (tfs *TierFS) store(namespace, originalPath, filename string) error { + f, err := os.Open(originalPath) + if err != nil { + return fmt.Errorf("open file %s: %w", originalPath, err) + } + + stat, err := f.Stat() + if err != nil { + return fmt.Errorf("file stat %s: %w", originalPath, err) + } + + if err := tfs.adaptor.Put(tfs.objPointer(namespace, filename), stat.Size(), f, block.PutOpts{}); err != nil { + return fmt.Errorf("adapter put %s: %w", filename, err) + } + + if err := f.Close(); err != nil { + return fmt.Errorf("closing file %s: %w", filename, err) + } + + fileRef := tfs.newLocalFileRef(namespace, filename) + + tfs.eviction.store(fileRef.fsRelativePath, stat.Size()) + if err := os.MkdirAll(path.Dir(fileRef.fullPath), os.ModePerm); err != nil { + return fmt.Errorf("creating file dir %s: %w", originalPath, err) + } + if err := os.Rename(originalPath, fileRef.fullPath); err != nil { + return fmt.Errorf("rename file %s: %w", originalPath, err) + } + + return nil +} + +// Create creates a new file in TierFS. +// File isn't stored in TierFS until a successful close operation. +// Open(namespace, filename) calls will return an error before the close was called. +func (tfs *TierFS) Create(namespace string) (*File, error) { + if err := validateNamespace(namespace); err != nil { + return nil, fmt.Errorf("invalid args: %w", err) + } + + if err := tfs.createNSWorkspaceDir(namespace); err != nil { + return nil, fmt.Errorf("create namespace dir: %w", err) + } + + tempPath := tfs.workspaceTempFilePath(namespace) + fh, err := os.Create(tempPath) + if err != nil { + return nil, fmt.Errorf("creating file: %w", err) + } + + return &File{ + fh: fh, + store: func(filename string) error { + return tfs.store(namespace, tempPath, filename) + }, + }, nil +} + +// Load returns the a file descriptor to the local file. +// If the file is missing from the local disk, it will try to fetch it from the block storage. +func (tfs *TierFS) Open(namespace, filename string) (*ROFile, error) { + if err := validateArgs(namespace, filename); err != nil { + return nil, err + } + + fileRef := tfs.newLocalFileRef(namespace, filename) + fh, err := os.Open(fileRef.fullPath) + if err == nil { + return tfs.openFile(fileRef, fh) + } + if !os.IsNotExist(err) { + return nil, fmt.Errorf("open file: %w", err) + } + + fh, err = tfs.readFromBlockStorage(fileRef) + if err != nil { + return nil, err + } + + return tfs.openFile(fileRef, fh) +} + +// openFile converts an os.File to pyramid.File and updates the eviction control. +func (tfs *TierFS) openFile(fileRef localFileRef, fh *os.File) (*ROFile, error) { + stat, err := fh.Stat() + if err != nil { + return nil, fmt.Errorf("file stat: %w", err) + } + + // No need to check for the store output here, + // we already have the file. + tfs.eviction.store(fileRef.fsRelativePath, stat.Size()) + return &ROFile{ + fh: fh, + rPath: fileRef.fsRelativePath, + eviction: tfs.eviction, + }, nil +} + +// readFromBlockStorage reads the referenced file from the block storage +// and places it in the local FS for further reading. +// It returns a file handle to the local file. +func (tfs *TierFS) readFromBlockStorage(fileRef localFileRef) (*os.File, error) { + reader, err := tfs.adaptor.Get(tfs.objPointer(fileRef.namespace, fileRef.filename), 0) + if err != nil { + return nil, fmt.Errorf("read from block storage: %w", err) + } + defer reader.Close() + + writer, err := os.Create(fileRef.fullPath) + if err != nil { + return nil, fmt.Errorf("creating file: %w", err) + } + + if _, err := io.Copy(writer, reader); err != nil { + return nil, fmt.Errorf("copying date to file: %w", err) + } + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("writer close: %w", err) + } + + fh, err := os.Open(fileRef.fullPath) + if err != nil { + return nil, fmt.Errorf("open file: %w", err) + } + return fh, nil +} + +func validateArgs(namespace, filename string) error { + if err := validateNamespace(namespace); err != nil { + return err + } + return validateFilename(filename) +} + +var ( + errSeparatorInFS = errors.New("path contains separator") + errPathInWorkspace = errors.New("file cannot be located in the workspace") + errEmptyDirInPath = errors.New("file path cannot contain an empty directory") +) + +func validateFilename(filename string) error { + if strings.HasPrefix(filename, workspaceDir+string(os.PathSeparator)) { + return errPathInWorkspace + } + if strings.Contains(filename, strings.Repeat(string(os.PathSeparator), 2)) { + return errEmptyDirInPath + } + return nil +} + +func validateNamespace(ns string) error { + if strings.ContainsRune(ns, os.PathSeparator) { + return errSeparatorInFS + } + return nil +} + +// relativePath is the path of the file under TierFS +type relativePath string + +// localFileRef consists of all possible local file references +type localFileRef struct { + namespace string + filename string + + fullPath string + fsRelativePath relativePath +} + +func (tfs *TierFS) newLocalFileRef(namespace, filename string) localFileRef { + relative := path.Join(namespace, filename) + return localFileRef{ + namespace: namespace, + filename: filename, + + fsRelativePath: relativePath(relative), + fullPath: path.Join(tfs.fsLocalBaseDir, relative), + } +} + +func (tfs *TierFS) objPointer(namespace, filename string) block.ObjectPointer { + if runtime.GOOS == "windows" { + filename = strings.ReplaceAll(filename, `\\`, "/") + } + + return block.ObjectPointer{ + StorageNamespace: namespace, + Identifier: tfs.blockStoragePath(filename), + } +} + +func (tfs *TierFS) blockStoragePath(filename string) string { + return path.Join(tfs.remotePrefix, filename) +} + +func (tfs *TierFS) createNSWorkspaceDir(namespace string) error { + return os.MkdirAll(tfs.workspaceDirPath(namespace), os.ModePerm) +} + +func (tfs *TierFS) workspaceDirPath(namespace string) string { + return path.Join(tfs.fsLocalBaseDir, namespace, workspaceDir) +} + +func (tfs *TierFS) workspaceTempFilePath(namespace string) string { + return path.Join(tfs.workspaceDirPath(namespace), uuid.Must(uuid.NewRandom()).String()) +} diff --git a/pyramid/tierFS_test.go b/pyramid/tierFS_test.go new file mode 100644 index 00000000000..142344c4771 --- /dev/null +++ b/pyramid/tierFS_test.go @@ -0,0 +1,195 @@ +package pyramid + +import ( + "io/ioutil" + "os" + "path" + "strconv" + "testing" + + "github.com/thanhpk/randstr" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/block/mem" +) + +var ( + fs FS + adapter block.Adapter +) + +const blockStoragePrefix = "prefix" +const allocatedDiskBytes = 4 * 1024 * 1024 + +func TestMain(m *testing.M) { + fsName := uuid.New().String() + + // cleanup + defer func() { + if err := os.RemoveAll(path.Join(os.TempDir(), fsName)); err != nil { + panic(err) + } + }() + + adapter = mem.New() + var err error + fs, err = NewFS(&Config{ + fsName: fsName, + adaptor: adapter, + fsBlockStoragePrefix: blockStoragePrefix, + localBaseDir: os.TempDir(), + allocatedDiskBytes: allocatedDiskBytes, + }) + if err != nil { + panic(err) + } + + code := m.Run() + + os.Exit(code) +} + +func TestSimpleWriteRead(t *testing.T) { + namespace := uuid.New().String() + filename := "1/2/file1.txt" + + content := "hello world!" + writeToFile(t, namespace, filename, content) + checkContent(t, namespace, filename, content) +} + +func TestReadFailDuringWrite(t *testing.T) { + namespace := uuid.New().String() + filename := "file1" + f, err := fs.Create(namespace) + require.NoError(t, err) + + content := "some content" + n, err := f.Write([]byte(content)) + require.NoError(t, err) + require.Equal(t, len(content), n) + + readF, err := fs.Open(namespace, filename) + require.Nil(t, readF) + require.Error(t, err) + + require.NoError(t, f.Close()) + require.NoError(t, f.store(filename)) + checkContent(t, namespace, filename, content) +} + +func TestEvictionSingleNamespace(t *testing.T) { + testEviction(t, uuid.New().String()) +} + +func TestEvictionMultipleNamespaces(t *testing.T) { + testEviction(t, uuid.New().String(), + uuid.New().String(), + uuid.New().String()) +} + +func TestStartup(t *testing.T) { + fsName := uuid.New().String() + namespace := uuid.New().String() + + // cleanup + defer func() { + if err := os.RemoveAll(path.Join(os.TempDir(), fsName)); err != nil { + panic(err) + } + }() + + namespacePath := path.Join(os.TempDir(), fsName, namespace) + workspacePath := path.Join(namespacePath, workspaceDir) + if err := os.MkdirAll(workspacePath, os.ModePerm); err != nil { + panic(err) + } + + filename := "ThisShouldStay" + content := "This Should Stay - I'm telling You!!!!" + if err := ioutil.WriteFile(path.Join(namespacePath, filename), []byte(content), os.ModePerm); err != nil { + panic(err) + } + if err := ioutil.WriteFile(path.Join(workspacePath, "ThisShouldNotStay"), []byte("ThisShouldNotStay"), os.ModePerm); err != nil { + panic(err) + } + + localFS, err := NewFS(&Config{ + fsName: fsName, + adaptor: mem.New(), + fsBlockStoragePrefix: blockStoragePrefix, + localBaseDir: os.TempDir(), + allocatedDiskBytes: allocatedDiskBytes, + }) + if err != nil { + panic(err) + } + + dir, err := os.Open(workspacePath) + require.Nil(t, dir) + require.True(t, os.IsNotExist(err)) + + f, err := localFS.Open(namespace, filename) + defer f.Close() + require.NoError(t, err) + + bytes, err := ioutil.ReadAll(f) + require.NoError(t, err) + require.Equal(t, content, string(bytes)) + +} + +func testEviction(t *testing.T, namespaces ...string) { + // making sure to fill the cache + fileBytes := 512 * 1024 + numFiles := 5 * allocatedDiskBytes / fileBytes + // write + for i := 0; i < numFiles; i++ { + filename := "file_" + strconv.Itoa(i) + + content := randstr.String(fileBytes, "abcdefghijklmnopqrstuvwxyz") + writeToFile(t, namespaces[i%len(namespaces)], filename, content) + } + + // read + for i := 0; i < numFiles; i++ { + filename := "file_" + strconv.Itoa(i) + + f, err := fs.Open(namespaces[i%len(namespaces)], filename) + require.NoError(t, err) + + _, err = ioutil.ReadAll(f) + require.NoError(t, err) + require.NoError(t, f.Close()) + } +} + +func TestInvalidArgs(t *testing.T) { + f, err := fs.Create("not/a/valid/namespace") + require.Nil(t, f) + require.Error(t, err) +} + +func writeToFile(t *testing.T, namespace, filename, content string) { + f, err := fs.Create(namespace) + require.NoError(t, err) + + n, err := f.Write([]byte(content)) + require.NoError(t, err) + require.Equal(t, len(content), n) + + require.NoError(t, f.Close()) + require.NoError(t, f.Store(filename)) +} + +func checkContent(t *testing.T, namespace string, filename string, content string) { + f, err := fs.Open(namespace, filename) + require.NoError(t, err) + defer f.Close() + + bytes, err := ioutil.ReadAll(f) + require.NoError(t, err) + require.Equal(t, content, string(bytes)) +}