Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TierFS enhancements #1008

Merged
merged 7 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a
github.com/stretchr/testify v1.6.1
github.com/thanhpk/randstr v1.0.4
github.com/tidwall/pretty v1.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,7 @@ github.com/ssgreg/nlreturn/v2 v2.0.1 h1:+lm6xFjVuNw/9t/Fh5sIwfNWefiD5bddzc6vwJ1T
github.com/ssgreg/nlreturn/v2 v2.0.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a h1:AhmOdSHeswKHBjhsLs/7+1voOxT+LLrSk/Nxvk35fug=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6VSaahyKadf4WtSsJIgne6A1WLOAGM8A=
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU=
Expand Down
84 changes: 84 additions & 0 deletions pyramid/directory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package pyramid

import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"
)

// directory synchronizes between file operations that might change (create/delete) directories
type directory struct {
// ceilingDir is the root directory of the FS - shouldn't never be deleted
ceilingDir string
sync.Mutex
}
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved

// deleteDirRecIfEmpty deletes the given directory if it is empty.
// It will continue to delete all parents directory if they are empty, until the ceilingDir.
func (d *directory) deleteDirRecIfEmpty(dir string) error {
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
d.Lock()
defer d.Unlock()

for dir != d.ceilingDir {
empty, err := isDirEmpty(dir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}
if !empty {
return nil
}

parentDir := path.Dir(dir)
if err := os.Remove(dir); err != nil {
return err
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
}
dir = parentDir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop can continue way up, above the planned location. I think directory needs a concept of a ceilingDir or firstAncestorDir or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning for rooted workspace dir to back me up, but it's indeed flaky. Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better, but still fragile! If I can control the inputs to removeFromLocal to include .., then this loop happily steps above it.

E.g., if ceilingDir is /a/b/data and I convince the FS to work with a file named x/../../c/d/e/f, then I can happily destroy many things. Right now this is not severe because we do not allow user-controlled paths into the tiers. But e.g. in the (alternate!) future where we afford users more control over the final location of their files, we might end up with user-controlled paths.

I would be happier if we at least documented that paths are not clean and must never be user-controlled. Or we could resolve to actual paths and work from there. E.g. @nopcoder suggested filepath.Clean to resolve a somewhat similar case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding documentation for now, really can't see how the paths (other than the base directory) will be user-controlled in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I don't particularly mind "user-controlled" paths when that user is the owner of lakeFS. I'm worried about "user-controlled" paths when they come from a lakeFS user, who is supposed only to have permissions to act on files inside lakeFS.
Thanks!

}

return nil
}

func isDirEmpty(name string) (bool, error) {
f, err := os.Open(name)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1)
if errors.Is(err, io.EOF) {
return true, nil
}
return false, err
}

// createFile creates the file under the path and creates all parent dirs if missing.
func (d *directory) createFile(path string) (*os.File, error) {
d.Lock()
defer d.Unlock()

if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil {
return nil, fmt.Errorf("creating dir: %w", err)
}

return os.Create(path)
}

// renameFile will move the src file to dst location and creates all parent dirs if missing.
func (d *directory) renameFile(src, dst string) error {
d.Lock()
defer d.Unlock()

if err := os.MkdirAll(filepath.Dir(dst), os.ModePerm); err != nil {
return fmt.Errorf("creating dir: %w", err)
}

return os.Rename(src, dst)
}
80 changes: 80 additions & 0 deletions pyramid/directory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pyramid

import (
"io/ioutil"
"os"
"path"
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestConcurrentCreateDeleteDir(t *testing.T) {
name, err := ioutil.TempDir("", "test-dir-")
require.NoError(t, err)
sut := directory{ceilingDir: name}

var wg sync.WaitGroup
concurrency := 1000
pathDir := path.Join(name, "a/b/c/")

for i := 0; i < concurrency; i++ {
// create and delete
filepath := path.Join(pathDir, strconv.Itoa(i))
wg.Add(2)
go func() {
f, err := sut.createFile(filepath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Remove(filepath))

wg.Done()
}()
// delete folder - this will sometime succeed if the folder is empty
go func() {
require.NoError(t, sut.deleteDirRecIfEmpty(pathDir))

wg.Done()
}()
}
// It doesn't really matter if the dir exists and its content.
// It's more about not panicking thru all of this
wg.Wait()
}

func TestConcurrentRenameDeleteDir(t *testing.T) {
name, err := ioutil.TempDir("", "test-dir-")
require.NoError(t, err)
sut := directory{ceilingDir: name}

var wg sync.WaitGroup
concurrency := 1000
pathDir := path.Join(name, "a/b/c/")

for i := 0; i < concurrency; i++ {
// create and delete
originalPath := path.Join(name, strconv.Itoa(i))
require.NoError(t, ioutil.WriteFile(originalPath, []byte("some data"), os.ModePerm))

filepath := path.Join(pathDir, strconv.Itoa(i))
wg.Add(2)
go func() {
err := sut.renameFile(originalPath, filepath)
require.NoError(t, err)
require.NoError(t, os.Remove(filepath))

wg.Done()
}()
// delete folder - this will sometime succeed if the folder is empty
go func() {
require.NoError(t, sut.deleteDirRecIfEmpty(pathDir))

wg.Done()
}()
}
// It doesn't really matter if the dir exists and its content.
// It's more about not panicking thru all of this
wg.Wait()
}
6 changes: 3 additions & 3 deletions pyramid/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type lruSizeEviction struct {
cache simplelru.LRUCache
}

func newLRUSizeEviction(capacity int64, evict func(rPath relativePath)) (eviction, error) {
cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, _ int64) {
evict(key.(relativePath))
func newLRUSizeEviction(capacity int64, evict func(rPath relativePath, cost int64)) (eviction, error) {
cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, cost int64) {
evict(key.(relativePath), cost)
})
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
Expand Down
40 changes: 8 additions & 32 deletions pyramid/file.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,21 @@
package pyramid

import (
"errors"
"fmt"
"os"
)

// File is pyramid wrapper for os.file that triggers pyramid hooks for file actions.
type File struct {
fh *os.File
*os.File

closed bool
persisted bool
store func(string) error
}

func (f *File) Read(p []byte) (n int, err error) {
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
return f.fh.ReadAt(p, off)
}

func (f *File) Write(p []byte) (n int, err error) {
return f.fh.Write(p)
}

func (f *File) Stat() (os.FileInfo, error) {
return f.fh.Stat()
}

func (f *File) Sync() error {
return f.fh.Sync()
}

func (f *File) Close() error {
f.closed = true
return f.fh.Close()
}

var (
errAlreadyPersisted = fmt.Errorf("file is already persisted")
errFileNotClosed = fmt.Errorf("file isn't closed")
)

// Store copies the closed file to all tiers of the pyramid.
Expand All @@ -50,14 +24,16 @@ func (f *File) Store(filename string) error {
return err
}

err := f.File.Close()
if err != nil && !errors.Is(err, os.ErrClosed) {
return fmt.Errorf("closing file: %w", err)
}

if f.persisted {
return errAlreadyPersisted
}
if !f.closed {
return errFileNotClosed
}

err := f.store(filename)
err = f.store(filename)
if err == nil {
f.persisted = true
}
Expand Down
6 changes: 3 additions & 3 deletions pyramid/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPyramidWriteFile(t *testing.T) {

storeCalled := false
sut := File{
fh: fh,
File: fh,
store: func(string) error {
storeCalled = true
return nil
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestWriteValidate(t *testing.T) {
storeCalled := false

sut := File{
fh: fh,
File: fh,
store: func(string) error {
storeCalled = true
return nil
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestPyramidReadFile(t *testing.T) {
}

sut := ROFile{
fh: fh,
File: fh,
eviction: mockEv,
rPath: relativePath(filename),
}
Expand Down
43 changes: 43 additions & 0 deletions pyramid/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pyramid

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// nolint: gomnd
const (
kb = float64(1024)
mb = 1024 * kb
fsNameLabel = "fsName"
errorTypeLabel = "type"
accessStatusLabel = "status"
)

var cacheAccess = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "tier_fs_cache_hits_total",
Help: "TierFS cache hits total count",
}, []string{fsNameLabel, accessStatusLabel})

var errorsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "tier_fs_errors_total",
Help: "TierFS errors by type",
}, []string{fsNameLabel, errorTypeLabel})

var evictionHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tier_fs_eviction_bytes",
Help: "TierFS evicted object size by bytes",
Buckets: []float64{0.5 * kb, 1 * kb, 16 * kb, 32 * kb, 128 * kb, 512 * kb, 1 * mb, 2 * mb, 4 * mb, 8 * mb, 16 * mb, 64 * mb},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be hard to compare buckets of these sizes, they are not equally-spaced on a log scale. The multipliers between successive elements should be constant, here they are 2, 16, 2, 4, 4, 2, 2, 2, 2, 2, 4. I think ExponentialBuckets does exactly that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^^^?

},
[]string{fsNameLabel})

var downloadHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tier_fs_download_bytes",
Help: "TierFS download from block-store object size by bytes",
Buckets: []float64{0.5 * kb, 1 * kb, 16 * kb, 32 * kb, 128 * kb, 512 * kb, 1 * mb, 2 * mb, 4 * mb, 8 * mb, 16 * mb, 64 * mb},
},
[]string{fsNameLabel})
12 changes: 4 additions & 8 deletions pyramid/ro_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,23 @@ import (
// ROFile is pyramid wrapper for os.file that implements io.ReadCloser
// with hooks for updating evictions.
type ROFile struct {
fh *os.File
*os.File
eviction eviction

rPath relativePath
}

func (f *ROFile) Read(p []byte) (n int, err error) {
f.eviction.touch(f.rPath)
return f.fh.Read(p)
return f.File.Read(p)
}

func (f *ROFile) ReadAt(p []byte, off int64) (n int, err error) {
f.eviction.touch(f.rPath)
return f.fh.ReadAt(p, off)
return f.File.ReadAt(p, off)
}

func (f *ROFile) Stat() (os.FileInfo, error) {
f.eviction.touch(f.rPath)
return f.fh.Stat()
}

func (f *ROFile) Close() error {
return f.fh.Close()
return f.File.Stat()
}
Loading