From 870283f677bbfc80fb9af672a7c22ef50872b7e8 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 10 Oct 2023 22:42:53 +0100 Subject: [PATCH] Fix behaviour on write failures --- segment/vfs_test.go | 21 ++++++++ segment/writer.go | 22 ++++++++ segment/writer_test.go | 114 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+) diff --git a/segment/vfs_test.go b/segment/vfs_test.go index a89738d..2de14e6 100644 --- a/segment/vfs_test.go +++ b/segment/vfs_test.go @@ -6,6 +6,7 @@ package segment import ( "bytes" "encoding/hex" + "errors" "fmt" "io" "os" @@ -162,6 +163,8 @@ type testWritableFile struct { maxWritten int lastSyncStart int closed, dirty bool + writeErr error + syncErr error } func newTestWritableFile(size int) *testWritableFile { @@ -174,6 +177,14 @@ func (f *testWritableFile) getBuf() []byte { return f.buf.Load().([]byte) } +func (f *testWritableFile) failNextWrite() { + f.writeErr = errors.New("IO error") +} + +func (f *testWritableFile) failNextSync() { + f.syncErr = errors.New("IO error") +} + // Truncate allows us to simulate the file being a different length to expected // for example due to a crash. func (f *testWritableFile) Truncate(size int) { @@ -207,6 +218,11 @@ func (f *testWritableFile) Dump() string { } func (f *testWritableFile) WriteAt(p []byte, off int64) (n int, err error) { + if f.writeErr != nil { + err := f.writeErr + f.writeErr = nil + return 0, err + } if !f.dirty { f.lastSyncStart = int(off) } @@ -268,6 +284,11 @@ func (f *testWritableFile) Close() error { } func (f *testWritableFile) Sync() error { + if f.syncErr != nil { + err := f.syncErr + f.syncErr = nil + return err + } f.dirty = false return nil } diff --git a/segment/writer.go b/segment/writer.go index c5a7bd5..1f0a2b2 100644 --- a/segment/writer.go +++ b/segment/writer.go @@ -257,6 +257,26 @@ func (w *Writer) Append(entries []types.LogEntry) error { return types.ErrSealed } + flushed := false + + // Save any state we may need to rollback. + beforeBuf := w.writer.commitBuf + beforeCRC := w.writer.crc + beforeIndexStart := w.writer.indexStart + beforeWriteOffset := w.writer.writeOffset + beforeOffsets := w.offsets.Load() + + defer func() { + if !flushed { + // rollback writer state on error + w.writer.commitBuf = beforeBuf + w.writer.crc = beforeCRC + w.writer.indexStart = beforeIndexStart + w.writer.writeOffset = beforeWriteOffset + w.offsets.Store(beforeOffsets) + } + }() + // Iterate entries and append each one for _, e := range entries { if err := w.appendEntry(e); err != nil { @@ -278,6 +298,8 @@ func (w *Writer) Append(entries []types.LogEntry) error { return err } + flushed = true + // Commit in-memory atomic.StoreUint64(&w.commitIdx, entries[len(entries)-1].Index) return nil diff --git a/segment/writer_test.go b/segment/writer_test.go index dfcddc4..e49824b 100644 --- a/segment/writer_test.go +++ b/segment/writer_test.go @@ -4,6 +4,7 @@ package segment import ( + "fmt" "sync/atomic" "testing" "time" @@ -117,3 +118,116 @@ func TestConcurrentReadersAndWriter(t *testing.T) { require.Greater(t, int(atomic.LoadUint64(&numReads)), 1000) require.Greater(t, int(atomic.LoadUint64(&sealedMaxIndex)), 1000) } + +func TestWriterRecoversFromWriteFailure(t *testing.T) { + cases := []struct { + name string + setupFailure func(f *testWritableFile, batch []types.LogEntry) + fixFailure func(batch []types.LogEntry) + }{ + { + name: "fwrite failure", + setupFailure: func(f *testWritableFile, batch []types.LogEntry) { + f.failNextWrite() + }, + }, + { + name: "fsync failure", + setupFailure: func(f *testWritableFile, batch []types.LogEntry) { + f.failNextSync() + }, + }, + { + name: "log append failure", + setupFailure: func(f *testWritableFile, batch []types.LogEntry) { + // Should cause monotonicity check to fail but only on last log after + // other logs have been written and internal state updated. + batch[len(batch)-1].Index = 123456 + }, + fixFailure: func(batch []types.LogEntry) { + batch[len(batch)-1].Index = batch[len(batch)-2].Index + 1 + }, + }, + } + + for _, tc := range cases { + tc := tc + + testFn := func(t *testing.T, empty bool) { + vfs := newTestVFS() + + f := NewFiler("test", vfs) + + seg0 := testSegment(1) + + w, err := f.Create(seg0) + require.NoError(t, err) + defer w.Close() + + batch := make([]types.LogEntry, 5) + for i := range batch { + batch[i].Index = uint64(i + 1) + batch[i].Data = []byte(fmt.Sprintf("val-%d", i+1)) + } + maxIdx := len(batch) + expectFirstIdx := 0 + expectLastIdx := 0 + + if !empty { + require.NoError(t, w.Append(batch)) + expectFirstIdx = 1 + expectLastIdx = maxIdx + for i := range batch { + batch[i].Index = uint64(i + maxIdx + 1) + batch[i].Data = []byte(fmt.Sprintf("val-%d", i+maxIdx+1)) + } + } + + tf := testFileFor(t, w) + + tc.setupFailure(tf, batch) + + require.Error(t, w.Append(batch)) + assertExpectedLogs(t, w, expectFirstIdx, expectLastIdx) + + if tc.fixFailure != nil { + tc.fixFailure(batch) + } + + // Now retry that write, it should work! + expectFirstIdx = 1 + expectLastIdx = int(batch[4].Index) + require.NoError(t, w.Append(batch)) + assertExpectedLogs(t, w, expectFirstIdx, expectLastIdx) + + // Also, re-open the file "from disk" to make sure what has been written + // is correct and recoverable! + w2, err := f.RecoverTail(seg0) + require.NoError(t, err) + assertExpectedLogs(t, w2, expectFirstIdx, expectLastIdx) + w2.Close() + } + + t.Run(tc.name+" empty", func(t *testing.T) { + testFn(t, true) + }) + t.Run(tc.name+" non-empty", func(t *testing.T) { + testFn(t, false) + }) + } +} + +func assertExpectedLogs(t *testing.T, w types.SegmentWriter, first, last int) { + t.Helper() + + require.Equal(t, uint64(last), w.LastIndex()) + if last == 0 { + return + } + for idx := first; idx <= last; idx++ { + buf, err := w.GetLog(uint64(idx)) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("val-%d", idx), string(buf.Bs)) + buf.Close() + } +}