Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immutable tiered storage #962

Merged
merged 16 commits into from
Dec 6, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first

DefaultDiskAllocatedBytes = 1 * 1024 * 1024 * 1024
DefaultDiskBaseDir = "~/lakefs/metadata"
DefaultDiskBaseDir = "~/lakefs/local_tier"
DefaultDiskBlockStoragePrefix = "_lakefs"

DefaultBlockStoreGSS3Endpoint = "https://storage.googleapis.com"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe // indirect
github.com/cznic/mathutil v0.0.0-20180504122225-ca4c9f2c1369
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/ristretto v0.0.3
github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654 // indirect
github.com/dlmiddlecote/sqlstats v1.0.1
github.com/georgysavva/scany v0.2.6
Expand Down Expand Up @@ -74,6 +73,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/thanhpk/randstr v1.0.4
github.com/tidwall/pretty v1.0.1 // indirect
github.com/treeverse/golang-lru v0.6.1
github.com/tsenart/vegeta/v12 v12.8.3
github.com/vbauerster/mpb/v5 v5.3.0
github.com/xitongsys/parquet-go v1.5.2
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,8 @@ github.com/denis-tingajkin/go-header v0.3.1 h1:ymEpSiFjeItCy1FOP+x0M2KdCELdEAHUs
github.com/denis-tingajkin/go-header v0.3.1/go.mod h1:sq/2IxMhaZX+RRcgHfCRx/m0M5na0fBt4/CRe7Lrji0=
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgraph-io/ristretto v0.0.3 h1:jh22xisGBjrEVnRZ1DVTpBVQm0Xndu8sMl0CWDzSIBI=
github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051 h1:ByJUvQYyTtNNCVfYNM48q6uYUT4fAlN0wNmd3th4BSo=
github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654 h1:XOPLOMn/zT4jIgxfxSsoXPxkrzz0FaCHwp33x5POJ+Q=
Expand Down Expand Up @@ -1103,6 +1099,8 @@ github.com/tommy-muehle/go-mnd v1.3.1-0.20200224220436-e6f9a994e8fa h1:RC4maTWLK
github.com/tommy-muehle/go-mnd v1.3.1-0.20200224220436-e6f9a994e8fa/go.mod h1:dSUh0FtTP8VhvkL1S+gUR1OKd9ZnSaozuI6r3m6wOig=
github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9rrstGQ=
github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM=
github.com/treeverse/golang-lru v0.6.1 h1:G7nAh8gqSChNEwRpC4MopsIhKvE5cJVQLgKsZhr6cmg=
github.com/treeverse/golang-lru v0.6.1/go.mod h1:q66C/bXLqkppmXQPBfsdbuzj1ELfQeOc4/SMS3SmDrc=
github.com/tsenart/go-tsz v0.0.0-20180814232043-cdeb9e1e981e/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo=
github.com/tsenart/vegeta/v12 v12.8.3 h1:UEsDkSrEJojMKW/xr7KUv4H/bYykX+V48KCsPZPqEfk=
github.com/tsenart/vegeta/v12 v12.8.3/go.mod h1:ZiJtwLn/9M4fTPdMY7bdbIeyNeFVE8/AHbWFqCsUuho=
Expand Down
42 changes: 16 additions & 26 deletions pyramid/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,23 @@ package pyramid
import (
"fmt"

"github.com/dgraph-io/ristretto"
lru "github.com/treeverse/golang-lru"
"github.com/treeverse/golang-lru/simplelru"
)

type lruSizeEviction struct {
cache *ristretto.Cache
// eviction is an abstraction of the eviction control for easy testing
type eviction interface {
touch(rPath relativePath)
store(rPath relativePath, filesize int64) int
}

const (
// Per ristretto, this is the optimized (static) buffer items
bufferItems = 64
)
type lruSizeEviction struct {
cache simplelru.LRUCache
}

func newLRUSizeEviction(capacity, estimatedFileBytes int64, evict func(rPath relativePath)) (*lruSizeEviction, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
// Per ristretto, this is the optimized counters num
NumCounters: int64(10.0 * float64(capacity) / float64(estimatedFileBytes)),
MaxCost: capacity,
Metrics: false,
OnEvict: onEvict(evict),
BufferItems: bufferItems,
func newLRUSizeEviction(capacity int64, evict func(rPath relativePath)) (eviction, error) {
cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, _ int64) {
evict(key.(relativePath))
})
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
Expand All @@ -32,18 +29,11 @@ func newLRUSizeEviction(capacity, estimatedFileBytes int64, evict func(rPath rel
}, nil
}

func onEvict(evict func(rPath relativePath)) func(uint64, uint64, interface{}, int64) {
return func(_, _ uint64, value interface{}, _ int64) {
evict(value.(relativePath))
}
}

func (am *lruSizeEviction) touch(rPath relativePath) {
am.cache.Get(string(rPath))
// update last access time, value is meaningless
am.cache.Get(rPath)
}

func (am *lruSizeEviction) store(rPath relativePath, filesize int64) bool {
// must store the path as value since the returned key is the hash,
// not the actual key.
return am.cache.Set(string(rPath), rPath, filesize)
func (am *lruSizeEviction) store(rPath relativePath, filesize int64) int {
return am.cache.Add(rPath, nil, filesize)
}
53 changes: 28 additions & 25 deletions pyramid/file.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
package pyramid
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved

import (
"errors"
"fmt"
"os"
)

// File is pyramid wrapper for os.file that triggers pyramid hooks for file actions.
type File struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the file name kept?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp file name, which is only relevant during the write itself, is held by the os.File handle.
The final filename is passed during Store.

fh *os.File
eviction *lruSizeEviction
fh *os.File

readOnly bool
rPath relativePath

store func() error
closed bool
persisted bool
store func(string) error
}

var ErrReadOnlyFile = errors.New("file is read-only")

func (f *File) Read(p []byte) (n int, err error) {
if f.readOnly {
// file is being written, eviction policies don't apply to it
f.eviction.touch(f.rPath)
}
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

off confused me. please change to offset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather keep the same os.File terminology :)

if f.readOnly {
// file is being written, eviction policies don't apply to it
f.eviction.touch(f.rPath)
}
return f.fh.ReadAt(p, off)
}

func (f *File) Write(p []byte) (n int, err error) {
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
if f.readOnly {
return 0, ErrReadOnlyFile
}

return f.fh.Write(p)
}

Expand All @@ -51,12 +35,31 @@ func (f *File) Sync() error {
}

func (f *File) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be simpler to use Store instead of Close. Store will close the file.
If someone wants to close without storing - he/she should use of.File

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to unify them. But Close may be called by a different layer, e.g. while closing sstable writer.

if err := f.fh.Close(); err != nil {
f.closed = true
return f.fh.Close()
}

var (
errAlreadyPersisted = fmt.Errorf("file is already persisted")
errFileNotClosed = fmt.Errorf("file isn't closed")
)

// Store copies the closed file to all tiers of the pyramid.
func (f *File) Store(filename string) error {
if err := validateFilename(filename); err != nil {
return err
}

if f.store == nil {
return nil
if f.persisted {
return errAlreadyPersisted
}
if !f.closed {
return errFileNotClosed
}

err := f.store(filename)
if err == nil {
f.persisted = true
}
return f.store()
return err
}
134 changes: 134 additions & 0 deletions pyramid/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package pyramid

import (
"io/ioutil"
"os"
"path"
"testing"

"github.com/stretchr/testify/require"

"github.com/google/uuid"
)

func TestPyramidWriteFile(t *testing.T) {
filename := uuid.Must(uuid.NewRandom()).String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of UUIDs or of random data in tests, but if we're going to use them then this form is documented as exactly the above:

Suggested change
filename := uuid.Must(uuid.NewRandom()).String()
filename := uuid.New().String()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed - need to some randomness so that subsequent runs after a fail test won't look at the same data (if failed to be deleted)

filepath := path.Join("/tmp", filename)
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
defer os.Remove(filepath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be after create

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comment.


fh, err := os.Create(filepath)
if err != nil {
panic(err)
}

storeCalled := false

sut := File{
fh: fh,
store: func(string) error {
storeCalled = true
return nil
},
}

content := "some content to write to file"
n, err := sut.Write([]byte(content))
require.Equal(t, len(content), n)
require.NoError(t, err)
require.NoError(t, sut.Sync())

_, err = sut.Stat()
require.NoError(t, err)

require.NoError(t, sut.Close())
require.NoError(t, sut.Store(filename))

require.True(t, storeCalled)
}

func TestWriteValidate(t *testing.T) {
filename := uuid.Must(uuid.NewRandom()).String()
filepath := path.Join("/tmp", filename)
defer os.Remove(filepath)

fh, err := os.Create(filepath)
if err != nil {
panic(err)
}

storeCalled := false

sut := File{
fh: fh,
store: func(string) error {
storeCalled = true
return nil
},
}

content := "some content to write to file"
n, err := sut.Write([]byte(content))
require.Equal(t, len(content), n)
require.NoError(t, err)

require.NoError(t, sut.Close())
require.Error(t, sut.Store("workspace"+string(os.PathSeparator)))
require.False(t, storeCalled)

require.Error(t, sut.Close())
require.NoError(t, sut.Store("validfilename"))
require.Error(t, sut.Store("validfilename"))
}

func TestPyramidReadFile(t *testing.T) {
filename := uuid.Must(uuid.NewRandom()).String()
filepath := path.Join("/tmp", filename)
content := "some content to write to file"
if err := ioutil.WriteFile(filepath, []byte(content), os.ModePerm); err != nil {
panic(err)
}
defer os.Remove(filepath)

mockEv := newMockEviction()

fh, err := os.Open(filepath)
if err != nil {
panic(err)
}

sut := ROFile{
fh: fh,
eviction: mockEv,
rPath: relativePath(filename),
}

_, err = sut.Stat()
require.NoError(t, err)

bytes := make([]byte, len(content))
n, err := sut.Read(bytes)
require.NoError(t, err)
require.Equal(t, len(content), n)
require.Equal(t, content, string(bytes))
require.NoError(t, sut.Close())

require.Equal(t, 1, mockEv.touchedTimes[relativePath(filename)])
}

type mockEviction struct {
touchedTimes map[relativePath]int
}

func newMockEviction() *mockEviction {
return &mockEviction{
touchedTimes: map[relativePath]int{},
}
}

func (me *mockEviction) touch(rPath relativePath) {
me.touchedTimes[rPath]++
}

func (me *mockEviction) store(_ relativePath, _ int64) int {
return 0
}
13 changes: 4 additions & 9 deletions pyramid/pyramid.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ package pyramid
// Files on the local disk are transient and might be cleaned up by the eviction policy.
// File structure under a namespace and namespace itself are flat (no directories).
type FS interface {
// Store adds the file from the originalPath to the FS. It uploads the file to the
// block-storage and to the localpath.
// Once completed, it will not be available from the original path.
Store(namespace, originalPath, filename string) error

// Create creates a new file in the FS.
// It will only be persistent after the returned file is closed.
Create(namespace, filename string) (*File, error)
// It will only be persistent after the returned file is stored.
Create(namespace string) (*File, error)

// Open finds the referenced file and returns the file descriptor.
// Open finds the referenced file and returns its read-only File.
// If file isn't in the local disk, it is fetched from the block storage.
Open(namespace, filename string) (*File, error)
Open(namespace, filename string) (*ROFile, error)
}
32 changes: 32 additions & 0 deletions pyramid/ro_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pyramid

import (
"os"
)

// ROFile is pyramid wrapper for os.file that implements io.ReadCloser
// with hooks for updating evictions.
type ROFile struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File should be WOFile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? it's also readable.

fh *os.File
eviction eviction

rPath relativePath
}

func (f *ROFile) Read(p []byte) (n int, err error) {
f.eviction.touch(f.rPath)
return f.fh.Read(p)
}

func (f *ROFile) ReadAt(p []byte, off int64) (n int, err error) {
f.eviction.touch(f.rPath)
return f.fh.ReadAt(p, off)
}

func (f *ROFile) Stat() (os.FileInfo, error) {
return f.fh.Stat()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to touch here. But... as long as we collect useful metrics from pyramid (not necessarily in this PR!), that's OK either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a touch - metrics in next PR.

}

func (f *ROFile) Close() error {
return f.fh.Close()
}
Loading