Skip to content

Commit

Permalink
Add abort option to pyramid and sstable writer (#1107)
Browse files Browse the repository at this point in the history
itaiad200 authored Dec 27, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 6bce641 commit 07e7b31
Showing 9 changed files with 174 additions and 313 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -73,5 +73,6 @@ terraform.rc

# generated mock files
/graveler/committed/**/mock/
/pyramid/mock/*

.eslintcache
3 changes: 3 additions & 0 deletions graveler/committed/part.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,9 @@ type Writer interface {

// Close flushes all records to the disk and returns the WriteResult.
Close() (*WriteResult, error)

// Abort terminates the non-closed file and removes all traces.
Abort() error
}

// BatchWriterCloser collects part writers and handles the asynchronous
14 changes: 13 additions & 1 deletion graveler/committed/sstable/writer.go
Original file line number Diff line number Diff line change
@@ -81,9 +81,21 @@ func (dw *DiskWriter) writeHashWithLen(buf []byte) error {
return nil
}

func (dw *DiskWriter) Abort() error {
if err := dw.w.Close(); err != nil {
return fmt.Errorf("sstable file close: %w", err)
}

if err := dw.fh.Abort(); err != nil {
return fmt.Errorf("sstable file abort: %w", err)
}

return nil
}

func (dw *DiskWriter) Close() (*committed.WriteResult, error) {
if err := dw.w.Close(); err != nil {
return nil, fmt.Errorf("sstable file write: %w", err)
return nil, fmt.Errorf("sstable file close: %w", err)
}

sstableID := fmt.Sprintf("%x", dw.hash.Sum(nil))
33 changes: 33 additions & 0 deletions graveler/committed/sstable/writer_test.go
Original file line number Diff line number Diff line change
@@ -65,6 +65,39 @@ func TestWriter(t *testing.T) {
require.Equal(t, committed.ID(f), wr.PartID)
}

func TestWriterAbort(t *testing.T) {
ctrl := gomock.NewController(t)
mockFS := mock.NewMockFS(ctrl)
defer ctrl.Finish()
ns := committed.Namespace("some-namespace")

// create the mock file with the matching file-system
mockFile := mock.NewMockStoredFile(ctrl)
mockFile.EXPECT().Abort().Return(nil).Times(1)
mockFile.EXPECT().Close().Return(nil).Times(1)
mockFS.EXPECT().Create(string(ns)).Return(mockFile, nil)

dw, err := sstable.NewDiskWriter(mockFS, ns, sha256.New())
require.NoError(t, err)
require.NotNil(t, dw)

// expect the specific write file actions
mockFile.EXPECT().Write(gomock.Any()).DoAndReturn(
func(b []byte) (int, error) {
return len(b), nil
}).Times(1)
mockFile.EXPECT().Sync().Return(nil).AnyTimes()

// Do the actual writing
err = dw.WriteRecord(committed.Record{
Key: []byte("key-1"),
Value: []byte("some-data"),
})

// Abort
require.NoError(t, dw.Abort())
}

func randomStrings(writes int) []string {
var keys []string
for i := 0; i < writes; i++ {
78 changes: 78 additions & 0 deletions pyramid/file_test.go
Original file line number Diff line number Diff line change
@@ -23,12 +23,17 @@ func TestPyramidWriteFile(t *testing.T) {
defer os.Remove(filepath)

storeCalled := false
abortCalled := false
sut := WRFile{
File: fh,
store: func(string) error {
storeCalled = true
return nil
},
abort: func() error {
abortCalled = true
return nil
},
}

content := "some content to write to file"
@@ -44,6 +49,9 @@ func TestPyramidWriteFile(t *testing.T) {
require.NoError(t, sut.Store(filename))

require.True(t, storeCalled)

require.Error(t, sut.Abort())
require.False(t, abortCalled)
}

func TestWriteValidate(t *testing.T) {
@@ -76,10 +84,80 @@ func TestWriteValidate(t *testing.T) {
require.False(t, storeCalled)

require.Error(t, sut.Close())
}

func TestMultipleWriteCalls(t *testing.T) {
filename := uuid.New().String()
fh, err := ioutil.TempFile("", filename)
if err != nil {
t.Fatal("Failed to create temp file", err)
}

filepath := fh.Name()
defer os.Remove(filepath)

storeCalled := false

sut := WRFile{
File: fh,
store: func(string) error {
storeCalled = true
return nil
},
}

content := "some content to write to file"
n, err := sut.Write([]byte(content))
require.Equal(t, len(content), n)
require.NoError(t, err)

require.NoError(t, sut.Close())
require.NoError(t, sut.Store("validfilename"))
require.True(t, storeCalled)

require.Error(t, sut.Store("validfilename"))
}

func TestAbort(t *testing.T) {
filename := uuid.New().String()
fh, err := ioutil.TempFile("", filename)
if err != nil {
t.Fatal("Failed to create temp file", err)
}

filepath := fh.Name()
defer os.Remove(filepath)

storeCalled := false
abortCalled := false

sut := WRFile{
File: fh,
store: func(string) error {
storeCalled = true
return nil
},
abort: func() error {
abortCalled = true
return nil
},
}

content := "some content to write to file"
n, err := sut.Write([]byte(content))
require.Equal(t, len(content), n)
require.NoError(t, err)

require.NoError(t, sut.Close())
require.False(t, abortCalled)
require.NoError(t, sut.Abort())
require.False(t, storeCalled)
require.True(t, abortCalled)

require.Error(t, sut.Store("validfilename"))
require.False(t, storeCalled)
}

func TestPyramidReadFile(t *testing.T) {
filename := uuid.New().String()
filepath := path.Join("/tmp", filename)
301 changes: 0 additions & 301 deletions pyramid/mock/pyramid.go

This file was deleted.

8 changes: 6 additions & 2 deletions pyramid/pyramid.go
Original file line number Diff line number Diff line change
@@ -35,7 +35,11 @@ type File interface {
type StoredFile interface {
File

// Store must operate on a closed file.
// Successful operation guarantees that the file is persistent.
// Successful Store operation guarantees that the file is persistent.
// If the file wasn't closed, Store closes it.
Store(filename string) error

// Abort removes all traces of the file from the filesystem.
// It's allowed to call Abort on the file at any stage, unless the file was already stored.
Abort() error
}
3 changes: 3 additions & 0 deletions pyramid/tier_fs.go
Original file line number Diff line number Diff line change
@@ -199,6 +199,9 @@ func (tfs *TierFS) Create(namespace string) (StoredFile, error) {
store: func(filename string) error {
return tfs.store(namespace, tempPath, filename)
},
abort: func() error {
return os.Remove(tempPath)
},
}, nil
}

46 changes: 37 additions & 9 deletions pyramid/wr_file.go
Original file line number Diff line number Diff line change
@@ -12,28 +12,56 @@ type WRFile struct {

persisted bool
store func(string) error
abort func() error
aborted bool
}

var errAlreadyPersisted = errors.New("file is already persisted")
var (
errFilePersisted = errors.New("file is persisted")
errFileAborted = errors.New("file is aborted")
)

// Store copies the closed file to all tiers of the pyramid.
func (f *WRFile) Store(filename string) error {
if f.aborted {
return errFileAborted
}
if f.persisted {
return errFilePersisted
}
f.persisted = true

if err := validateFilename(filename); err != nil {
return err
}

err := f.File.Close()
if err != nil && !errors.Is(err, os.ErrClosed) {
return fmt.Errorf("closing file: %w", err)
if err := f.idempotentClose(); err != nil {
return err
}

return f.store(filename)
}

// Abort delete the file and cleans all traces of it.
// If file was already stored, returns an error.
func (f *WRFile) Abort() error {
if f.persisted {
return errAlreadyPersisted
return errFilePersisted
}
f.aborted = true

err = f.store(filename)
if err == nil {
f.persisted = true
if err := f.idempotentClose(); err != nil {
return err
}

return f.abort()
}

// idempotentClose is like Close(), but doesn't fail when the file is already closed.
func (f *WRFile) idempotentClose() error {
err := f.File.Close()
if err != nil && !errors.Is(err, os.ErrClosed) {
return fmt.Errorf("closing file: %w", err)
}
return err
return nil
}

0 comments on commit 07e7b31

Please sign in to comment.