Skip to content

Commit

Permalink
Fix behaviour on write failures
Browse files Browse the repository at this point in the history
  • Loading branch information
banks committed Oct 10, 2023
1 parent c5b57bc commit 870283f
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 0 deletions.
21 changes: 21 additions & 0 deletions segment/vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package segment
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -162,6 +163,8 @@ type testWritableFile struct {
maxWritten int
lastSyncStart int
closed, dirty bool
writeErr error
syncErr error
}

func newTestWritableFile(size int) *testWritableFile {
Expand All @@ -174,6 +177,14 @@ func (f *testWritableFile) getBuf() []byte {
return f.buf.Load().([]byte)
}

func (f *testWritableFile) failNextWrite() {
f.writeErr = errors.New("IO error")
}

func (f *testWritableFile) failNextSync() {
f.syncErr = errors.New("IO error")
}

// Truncate allows us to simulate the file being a different length to expected
// for example due to a crash.
func (f *testWritableFile) Truncate(size int) {
Expand Down Expand Up @@ -207,6 +218,11 @@ func (f *testWritableFile) Dump() string {
}

func (f *testWritableFile) WriteAt(p []byte, off int64) (n int, err error) {
if f.writeErr != nil {
err := f.writeErr
f.writeErr = nil
return 0, err
}
if !f.dirty {
f.lastSyncStart = int(off)
}
Expand Down Expand Up @@ -268,6 +284,11 @@ func (f *testWritableFile) Close() error {
}

func (f *testWritableFile) Sync() error {
if f.syncErr != nil {
err := f.syncErr
f.syncErr = nil
return err
}
f.dirty = false
return nil
}
22 changes: 22 additions & 0 deletions segment/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,26 @@ func (w *Writer) Append(entries []types.LogEntry) error {
return types.ErrSealed
}

flushed := false

// Save any state we may need to rollback.
beforeBuf := w.writer.commitBuf
beforeCRC := w.writer.crc
beforeIndexStart := w.writer.indexStart
beforeWriteOffset := w.writer.writeOffset
beforeOffsets := w.offsets.Load()

defer func() {
if !flushed {
// rollback writer state on error
w.writer.commitBuf = beforeBuf
w.writer.crc = beforeCRC
w.writer.indexStart = beforeIndexStart
w.writer.writeOffset = beforeWriteOffset
w.offsets.Store(beforeOffsets)
}
}()

// Iterate entries and append each one
for _, e := range entries {
if err := w.appendEntry(e); err != nil {
Expand All @@ -278,6 +298,8 @@ func (w *Writer) Append(entries []types.LogEntry) error {
return err
}

flushed = true

// Commit in-memory
atomic.StoreUint64(&w.commitIdx, entries[len(entries)-1].Index)
return nil
Expand Down
114 changes: 114 additions & 0 deletions segment/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package segment

import (
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -117,3 +118,116 @@ func TestConcurrentReadersAndWriter(t *testing.T) {
require.Greater(t, int(atomic.LoadUint64(&numReads)), 1000)
require.Greater(t, int(atomic.LoadUint64(&sealedMaxIndex)), 1000)
}

func TestWriterRecoversFromWriteFailure(t *testing.T) {
cases := []struct {
name string
setupFailure func(f *testWritableFile, batch []types.LogEntry)
fixFailure func(batch []types.LogEntry)
}{
{
name: "fwrite failure",
setupFailure: func(f *testWritableFile, batch []types.LogEntry) {
f.failNextWrite()
},
},
{
name: "fsync failure",
setupFailure: func(f *testWritableFile, batch []types.LogEntry) {
f.failNextSync()
},
},
{
name: "log append failure",
setupFailure: func(f *testWritableFile, batch []types.LogEntry) {
// Should cause monotonicity check to fail but only on last log after
// other logs have been written and internal state updated.
batch[len(batch)-1].Index = 123456
},
fixFailure: func(batch []types.LogEntry) {
batch[len(batch)-1].Index = batch[len(batch)-2].Index + 1
},
},
}

for _, tc := range cases {
tc := tc

testFn := func(t *testing.T, empty bool) {
vfs := newTestVFS()

f := NewFiler("test", vfs)

seg0 := testSegment(1)

w, err := f.Create(seg0)
require.NoError(t, err)
defer w.Close()

batch := make([]types.LogEntry, 5)
for i := range batch {
batch[i].Index = uint64(i + 1)
batch[i].Data = []byte(fmt.Sprintf("val-%d", i+1))
}
maxIdx := len(batch)
expectFirstIdx := 0
expectLastIdx := 0

if !empty {
require.NoError(t, w.Append(batch))
expectFirstIdx = 1
expectLastIdx = maxIdx
for i := range batch {
batch[i].Index = uint64(i + maxIdx + 1)
batch[i].Data = []byte(fmt.Sprintf("val-%d", i+maxIdx+1))
}
}

tf := testFileFor(t, w)

tc.setupFailure(tf, batch)

require.Error(t, w.Append(batch))
assertExpectedLogs(t, w, expectFirstIdx, expectLastIdx)

if tc.fixFailure != nil {
tc.fixFailure(batch)
}

// Now retry that write, it should work!
expectFirstIdx = 1
expectLastIdx = int(batch[4].Index)
require.NoError(t, w.Append(batch))
assertExpectedLogs(t, w, expectFirstIdx, expectLastIdx)

// Also, re-open the file "from disk" to make sure what has been written
// is correct and recoverable!
w2, err := f.RecoverTail(seg0)
require.NoError(t, err)
assertExpectedLogs(t, w2, expectFirstIdx, expectLastIdx)
w2.Close()
}

t.Run(tc.name+" empty", func(t *testing.T) {
testFn(t, true)
})
t.Run(tc.name+" non-empty", func(t *testing.T) {
testFn(t, false)
})
}
}

func assertExpectedLogs(t *testing.T, w types.SegmentWriter, first, last int) {
t.Helper()

require.Equal(t, uint64(last), w.LastIndex())
if last == 0 {
return
}
for idx := first; idx <= last; idx++ {
buf, err := w.GetLog(uint64(idx))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("val-%d", idx), string(buf.Bs))
buf.Close()
}
}

0 comments on commit 870283f

Please sign in to comment.