Skip to content

Commit

Permalink
TierFS enhancements (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Dec 14, 2020
1 parent ec39a82 commit 2a70506
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 105 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a
github.com/stretchr/testify v1.6.1
github.com/thanhpk/randstr v1.0.4
github.com/tidwall/pretty v1.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,7 @@ github.com/ssgreg/nlreturn/v2 v2.0.1 h1:+lm6xFjVuNw/9t/Fh5sIwfNWefiD5bddzc6vwJ1T
github.com/ssgreg/nlreturn/v2 v2.0.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a h1:AhmOdSHeswKHBjhsLs/7+1voOxT+LLrSk/Nxvk35fug=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6VSaahyKadf4WtSsJIgne6A1WLOAGM8A=
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU=
Expand Down
86 changes: 86 additions & 0 deletions pyramid/directory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package pyramid

import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"
)

// directory synchronizes between file operations that might change (create/delete) directories
type directory struct {
// ceilingDir is the root directory of the FS - shouldn't never be deleted
ceilingDir string
mu sync.Mutex
}

// deleteDirRecIfEmpty deletes the given directory if it is empty.
// It will continue to delete all parents directory if they are empty, until the ceilingDir.
// Passed dir path isn't checked for malicious referencing (e.g. "../../../usr") and should never be
// controlled by any user input.
func (d *directory) deleteDirRecIfEmpty(dir string) error {
d.mu.Lock()
defer d.mu.Unlock()

for dir != d.ceilingDir {
empty, err := isDirEmpty(dir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}
if !empty {
return nil
}

parentDir := path.Dir(dir)
if err := os.Remove(dir); err != nil {
return err
}
dir = parentDir
}

return nil
}

func isDirEmpty(name string) (bool, error) {
f, err := os.Open(name)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1)
if errors.Is(err, io.EOF) {
return true, nil
}
return false, err
}

// createFile creates the file under the path and creates all parent dirs if missing.
func (d *directory) createFile(path string) (*os.File, error) {
d.mu.Lock()
defer d.mu.Unlock()

if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil {
return nil, fmt.Errorf("creating dir: %w", err)
}

return os.Create(path)
}

// renameFile will move the src file to dst location and creates all parent dirs if missing.
func (d *directory) renameFile(src, dst string) error {
d.mu.Lock()
defer d.mu.Unlock()

if err := os.MkdirAll(filepath.Dir(dst), os.ModePerm); err != nil {
return fmt.Errorf("creating dir: %w", err)
}

return os.Rename(src, dst)
}
80 changes: 80 additions & 0 deletions pyramid/directory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pyramid

import (
"io/ioutil"
"os"
"path"
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestConcurrentCreateDeleteDir(t *testing.T) {
name, err := ioutil.TempDir("", "test-dir-")
require.NoError(t, err)
sut := directory{ceilingDir: name}

var wg sync.WaitGroup
concurrency := 1000
pathDir := path.Join(name, "a/b/c/")

for i := 0; i < concurrency; i++ {
// create and delete
filepath := path.Join(pathDir, strconv.Itoa(i))
wg.Add(2)
go func() {
f, err := sut.createFile(filepath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Remove(filepath))

wg.Done()
}()
// delete folder - this will sometime succeed if the folder is empty
go func() {
require.NoError(t, sut.deleteDirRecIfEmpty(pathDir))

wg.Done()
}()
}
// It doesn't really matter if the dir exists and its content.
// It's more about not panicking thru all of this
wg.Wait()
}

func TestConcurrentRenameDeleteDir(t *testing.T) {
name, err := ioutil.TempDir("", "test-dir-")
require.NoError(t, err)
sut := directory{ceilingDir: name}

var wg sync.WaitGroup
concurrency := 1000
pathDir := path.Join(name, "a/b/c/")

for i := 0; i < concurrency; i++ {
// create and delete
originalPath := path.Join(name, strconv.Itoa(i))
require.NoError(t, ioutil.WriteFile(originalPath, []byte("some data"), os.ModePerm))

filepath := path.Join(pathDir, strconv.Itoa(i))
wg.Add(2)
go func() {
err := sut.renameFile(originalPath, filepath)
require.NoError(t, err)
require.NoError(t, os.Remove(filepath))

wg.Done()
}()
// delete folder - this will sometime succeed if the folder is empty
go func() {
require.NoError(t, sut.deleteDirRecIfEmpty(pathDir))

wg.Done()
}()
}
// It doesn't really matter if the dir exists and its content.
// It's more about not panicking thru all of this
wg.Wait()
}
6 changes: 3 additions & 3 deletions pyramid/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type lruSizeEviction struct {
cache simplelru.LRUCache
}

func newLRUSizeEviction(capacity int64, evict func(rPath relativePath)) (eviction, error) {
cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, _ int64) {
evict(key.(relativePath))
func newLRUSizeEviction(capacity int64, evict func(rPath relativePath, cost int64)) (eviction, error) {
cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, cost int64) {
evict(key.(relativePath), cost)
})
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
Expand Down
40 changes: 8 additions & 32 deletions pyramid/file.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,21 @@
package pyramid

import (
"errors"
"fmt"
"os"
)

// File is pyramid wrapper for os.file that triggers pyramid hooks for file actions.
type File struct {
fh *os.File
*os.File

closed bool
persisted bool
store func(string) error
}

func (f *File) Read(p []byte) (n int, err error) {
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
return f.fh.ReadAt(p, off)
}

func (f *File) Write(p []byte) (n int, err error) {
return f.fh.Write(p)
}

func (f *File) Stat() (os.FileInfo, error) {
return f.fh.Stat()
}

func (f *File) Sync() error {
return f.fh.Sync()
}

func (f *File) Close() error {
f.closed = true
return f.fh.Close()
}

var (
errAlreadyPersisted = fmt.Errorf("file is already persisted")
errFileNotClosed = fmt.Errorf("file isn't closed")
)

// Store copies the closed file to all tiers of the pyramid.
Expand All @@ -50,14 +24,16 @@ func (f *File) Store(filename string) error {
return err
}

err := f.File.Close()
if err != nil && !errors.Is(err, os.ErrClosed) {
return fmt.Errorf("closing file: %w", err)
}

if f.persisted {
return errAlreadyPersisted
}
if !f.closed {
return errFileNotClosed
}

err := f.store(filename)
err = f.store(filename)
if err == nil {
f.persisted = true
}
Expand Down
6 changes: 3 additions & 3 deletions pyramid/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPyramidWriteFile(t *testing.T) {

storeCalled := false
sut := File{
fh: fh,
File: fh,
store: func(string) error {
storeCalled = true
return nil
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestWriteValidate(t *testing.T) {
storeCalled := false

sut := File{
fh: fh,
File: fh,
store: func(string) error {
storeCalled = true
return nil
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestPyramidReadFile(t *testing.T) {
}

sut := ROFile{
fh: fh,
File: fh,
eviction: mockEv,
rPath: relativePath(filename),
}
Expand Down
42 changes: 42 additions & 0 deletions pyramid/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pyramid

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// nolint: gomnd
const (
kb = float64(1024)
fsNameLabel = "fsName"
errorTypeLabel = "type"
accessStatusLabel = "status"
)

var cacheAccess = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "tier_fs_cache_hits_total",
Help: "TierFS cache hits total count",
}, []string{fsNameLabel, accessStatusLabel})

var errorsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "tier_fs_errors_total",
Help: "TierFS errors by type",
}, []string{fsNameLabel, errorTypeLabel})

var evictionHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tier_fs_eviction_bytes",
Help: "TierFS evicted object size by bytes",
Buckets: prometheus.ExponentialBuckets(kb, 4, 7),
},
[]string{fsNameLabel})

var downloadHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tier_fs_download_bytes",
Help: "TierFS download from block-store object size by bytes",
Buckets: prometheus.ExponentialBuckets(kb, 4, 7),
},
[]string{fsNameLabel})
12 changes: 4 additions & 8 deletions pyramid/ro_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,23 @@ import (
// ROFile is pyramid wrapper for os.file that implements io.ReadCloser
// with hooks for updating evictions.
type ROFile struct {
fh *os.File
*os.File
eviction eviction

rPath relativePath
}

func (f *ROFile) Read(p []byte) (n int, err error) {
f.eviction.touch(f.rPath)
return f.fh.Read(p)
return f.File.Read(p)
}

func (f *ROFile) ReadAt(p []byte, off int64) (n int, err error) {
f.eviction.touch(f.rPath)
return f.fh.ReadAt(p, off)
return f.File.ReadAt(p, off)
}

func (f *ROFile) Stat() (os.FileInfo, error) {
f.eviction.touch(f.rPath)
return f.fh.Stat()
}

func (f *ROFile) Close() error {
return f.fh.Close()
return f.File.Stat()
}
Loading

0 comments on commit 2a70506

Please sign in to comment.