From a874ed643ea02f136c5520e639598d7a2698bad3 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Tue, 14 May 2024 16:37:40 +0400 Subject: [PATCH] feat: add persistence support Add support to persist and load buffer from disk. Co-authored-by: Utku Ozdemir Signed-off-by: Andrey Smirnov --- chunk.go | 3 +- circular.go | 15 ++++ options.go | 49 ++++++++++- persistence.go | 203 ++++++++++++++++++++++++++++++++++++++++++++ persistence_test.go | 118 +++++++++++++++++++++++++ zstd/zstd.go | 21 +++++ zstd/zstd_test.go | 61 +++++++++++++ 7 files changed, 468 insertions(+), 2 deletions(-) create mode 100644 persistence.go create mode 100644 persistence_test.go create mode 100644 zstd/zstd_test.go diff --git a/chunk.go b/chunk.go index 5bd966a..2852743 100644 --- a/chunk.go +++ b/chunk.go @@ -11,5 +11,6 @@ type chunk struct { startOffset int64 // uncompressed size of the chunk size int64 - // [TODO]: have a unique (incrementing?) chunk ID for file-based storage + // unique chunk ID + id int64 } diff --git a/circular.go b/circular.go index 6cbceb3..14177c1 100644 --- a/circular.go +++ b/circular.go @@ -12,6 +12,10 @@ import ( "sync" ) +/// Buffer ---> (( new chunk X + any V, delete chunk Y + any V )) ---> Persister + +// Close(): ^^^ stop this channel ; return any chunks to persist? + // Buffer implements circular buffer with a thread-safe writer, // that supports multiple readers each with its own offset. type Buffer struct { @@ -59,6 +63,10 @@ func NewBuffer(opts ...OptionFunc) (*Buffer, error) { buf.data = make([]byte, buf.opt.InitialCapacity) buf.cond = sync.NewCond(&buf.mu) + if err := buf.load(); err != nil { + return nil, err + } + return buf, nil } @@ -122,10 +130,17 @@ func (buf *Buffer) Write(p []byte) (int, error) { return n, err } + var maxID int64 + + for _, c := range buf.chunks { + maxID = max(c.id, maxID) + } + buf.chunks = append(buf.chunks, chunk{ compressed: compressed, startOffset: buf.off - int64(buf.opt.MaxCapacity), size: int64(buf.opt.MaxCapacity), + id: maxID + 1, }) if len(buf.chunks) > buf.opt.NumCompressedChunks { diff --git a/options.go b/options.go index 7adb283..31aadae 100644 --- a/options.go +++ b/options.go @@ -4,12 +4,17 @@ package circular -import "fmt" +import ( + "fmt" + "time" +) // Options defines settings for Buffer. type Options struct { Compressor Compressor + PersistenceOptions PersistenceOptions + InitialCapacity int MaxCapacity int SafetyGap int @@ -17,12 +22,33 @@ type Options struct { NumCompressedChunks int } +// PersistenceOptions defines settings for Buffer persistence. +type PersistenceOptions struct { + // ChunkPath is the base path to the store chunk files. + // + // Example: /var/log/machine/my-machine.log, chunks will be stored + // by appending a chunk ID to this path, e.g. /var/log/machine/my-machine.log.3. + // + // If ChunkPath is empty, persistence is disabled. + ChunkPath string + + // FlushInterval flushes buffer content to disk every FlushInterval (if there were any changes). + FlushInterval time.Duration + + // FlushJitter adds random jitter to FlushInterval to avoid thundering herd problem (a ratio of FlushInterval). + FlushJitter float64 +} + // Compressor implements an optional interface for chunk compression. // // Compress and Decompress append to the dest slice and return the result. +// +// Compressor should be safe for concurrent use by multiple goroutines. +// Compressor should verify checksums of the compressed data. type Compressor interface { Compress(src, dest []byte) ([]byte, error) Decompress(src, dest []byte) ([]byte, error) + DecompressedSize(src []byte) (int64, error) } // defaultOptions returns default initial values. @@ -95,3 +121,24 @@ func WithNumCompressedChunks(num int, c Compressor) OptionFunc { return nil } } + +// WithPersistence enables buffer persistence to disk. +func WithPersistence(options PersistenceOptions) OptionFunc { + return func(opt *Options) error { + if options.ChunkPath == "" { + return fmt.Errorf("chunk path should be set") + } + + if options.FlushJitter < 0 || options.FlushJitter > 1 { + return fmt.Errorf("flush jitter should be in range [0, 1]: %f", options.FlushJitter) + } + + if opt.Compressor == nil { + return fmt.Errorf("compressor should be set for persistence") + } + + opt.PersistenceOptions = options + + return nil + } +} diff --git a/persistence.go b/persistence.go new file mode 100644 index 0000000..038b67a --- /dev/null +++ b/persistence.go @@ -0,0 +1,203 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular + +import ( + "cmp" + "context" + "math/rand" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + "time" +) + +func (buf *Buffer) load() error { + if buf.opt.PersistenceOptions.ChunkPath == "" { + // persistence is disabled + return nil + } + + chunkPaths, err := filepath.Glob(buf.opt.PersistenceOptions.ChunkPath + ".*") + if err != nil { + return err + } + + type parsedChunkPath struct { + path string + id int64 + } + + parsedChunkPaths := make([]parsedChunkPath, 0, len(chunkPaths)) + + for _, chunkPath := range chunkPaths { + idx := strings.LastIndexByte(chunkPath, '.') + if idx == -1 { + continue + } + + idStr := chunkPath[idx+1:] + + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id < 0 { + continue + } + + parsedChunkPaths = append(parsedChunkPaths, parsedChunkPath{ + id: id, + path: chunkPath, + }) + } + + // sort chunks by ID, from smallest to biggest + slices.SortFunc(parsedChunkPaths, func(a, b parsedChunkPath) int { + return cmp.Compare(a.id, b.id) + }) + + idx := 0 + + if len(parsedChunkPaths) > 1 && parsedChunkPaths[0].id == 0 { + idx = 1 + } + + if len(parsedChunkPaths)-idx > buf.opt.NumCompressedChunks { + for j := idx; j < len(parsedChunkPaths)-buf.opt.NumCompressedChunks; j++ { + if err := os.Remove(parsedChunkPaths[j].path); err != nil { + // failed to remove the chunk + continue + } + } + + parsedChunkPaths = slices.Delete(parsedChunkPaths, idx, len(parsedChunkPaths)-buf.opt.NumCompressedChunks) + } + + chunks := make([]chunk, 0, len(parsedChunkPaths)) + + for _, chunkPath := range parsedChunkPaths { + data, err := os.ReadFile(chunkPath.path) + if err != nil { + // failed to read the chunk + continue + } + + if chunkPath.id == 0 { + buf.data, err = buf.opt.Compressor.Decompress(data, buf.data[:0]) + if err != nil { + // failed to decompress the data + buf.data = buf.data[:cap(buf.data)] + + continue + } + + buf.off = int64(len(buf.data)) + buf.data = buf.data[:cap(buf.data)] + } else { + decompressedSize, err := buf.opt.Compressor.DecompressedSize(data) + if err != nil { + // failed to get the decompressed size + continue + } + + chunks = append(chunks, + chunk{ + compressed: data, + id: chunkPath.id, + size: decompressedSize, + }) + } + } + + // re-calculate all offsets + var sizeCompressed int64 + + for i := range chunks { + sizeCompressed += chunks[i].size + } + + // if chunk sizes are [10, 30, 20], the offsets will be [-60, -50, -20]. + // the current buffer starts at 0 and goes to b.off (size of the buffer). + for i := range chunks { + chunks[i].startOffset = -sizeCompressed + sizeCompressed -= chunks[i].size + } + + buf.chunks = chunks + + return nil +} + +type persistenceCommand struct { + chunkID int64 + drop bool + + data []byte +} + +func (buf *Buffer) chunkPath(chunkID int64) string { + return buf.opt.PersistenceOptions.ChunkPath + "." + strconv.FormatInt(chunkID, 10) +} + +func (buf *Buffer) runPersistence(ctx context.Context, ch <-chan persistenceCommand) { + var ( + timerC <-chan time.Time + timer *time.Timer + ) + + defer func() { + if timer == nil { + return + } + + if !timer.Stop() { + <-timer.C + } + }() + + setTimer := func() { + interval := time.Duration(((rand.Float64()*2-1)*buf.opt.PersistenceOptions.FlushJitter + 1.0) * float64(buf.opt.PersistenceOptions.FlushInterval)) + + if timer == nil { + timer = time.NewTimer(interval) + timerC = timer.C + } else { + timer.Reset(interval) + } + } + + if buf.opt.PersistenceOptions.FlushInterval > 0 { + setTimer() + } + + for { + select { + case <-ctx.Done(): + return + case command := <-ch: + if command.drop { + os.Remove(buf.chunkPath(command.chunkID)) + } else { + os.WriteFile(buf.chunkPath(command.chunkID), command.data, 0o644) + } + case <-timerC: + // persist current chunk if changed + + // if Changed + buf.mu.Lock() + data := slices.Clone(buf.data[:buf.off%int64(cap(buf.data))]) + buf.mu.Unlock() + + compressed, err := buf.opt.Compressor.Compress(data, nil) + if err != nil { + // WTF??? + } + + os.WriteFile(buf.chunkPath(0), compressed, 0o644) + + setTimer() + } + } +} diff --git a/persistence_test.go b/persistence_test.go new file mode 100644 index 0000000..0891500 --- /dev/null +++ b/persistence_test.go @@ -0,0 +1,118 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular_test + +import ( + "bytes" + "io" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/siderolabs/gen/xtesting/must" + "github.com/stretchr/testify/require" + + "github.com/siderolabs/go-circular" + "github.com/siderolabs/go-circular/zstd" +) + +func TestLoad(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + + options []circular.OptionFunc + + chunkSizes []int + + numCompressedChunks int + }{ + { + name: "no chunks", + + numCompressedChunks: 5, + }, + { + name: "expected number of chunks", + + numCompressedChunks: 5, + + chunkSizes: []int{1024, 2048, 1024, 1024, 3072, 4096}, + }, + { + name: "less chunks than expected", + + numCompressedChunks: 5, + + chunkSizes: []int{10240, 20480}, + }, + { + name: "more chunks than expected", + + numCompressedChunks: 4, + + chunkSizes: []int{10240, 2048, 1024, 1024, 3072, 4096}, + }, + { + name: "single chunk", + + numCompressedChunks: 2, + + chunkSizes: []int{10240}, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + + compressor := must.Value(zstd.NewCompressor())(t) + + for i, size := range test.chunkSizes { + chunkData := bytes.Repeat([]byte{byte(i)}, size) + chunkData = must.Value(compressor.Compress(chunkData, nil))(t) + + chunkPath := filepath.Join(dir, "chunk."+strconv.Itoa(i)) + + require.NoError(t, os.WriteFile(chunkPath, chunkData, 0o644)) + } + + buf, err := circular.NewBuffer(append(test.options, + circular.WithNumCompressedChunks(test.numCompressedChunks, compressor), + circular.WithPersistence(circular.PersistenceOptions{ + ChunkPath: filepath.Join(dir, "chunk"), + }), + )...) + require.NoError(t, err) + + actualData, err := io.ReadAll(buf.GetReader()) + require.NoError(t, err) + + var expectedData []byte + + firstIdx := 1 + + if len(test.chunkSizes) > test.numCompressedChunks+1 { + firstIdx = len(test.chunkSizes) - test.numCompressedChunks + } + + for i := firstIdx; i < firstIdx-1+min(len(test.chunkSizes), test.numCompressedChunks+1); i++ { + expectedData = append(expectedData, bytes.Repeat([]byte{byte(i)}, test.chunkSizes[i])...) + } + + if len(test.chunkSizes) > 0 { + expectedData = append(expectedData, bytes.Repeat([]byte{0}, test.chunkSizes[0])...) + } + + if expectedData == nil { + expectedData = []byte{} + } + + require.Equal(t, expectedData, actualData) + }) + } +} diff --git a/zstd/zstd.go b/zstd/zstd.go index 08c5bda..8a6b1b1 100644 --- a/zstd/zstd.go +++ b/zstd/zstd.go @@ -6,6 +6,8 @@ package zstd import ( + "errors" + "github.com/klauspost/compress/zstd" ) @@ -42,3 +44,22 @@ func (c *Compressor) Compress(src, dest []byte) ([]byte, error) { func (c *Compressor) Decompress(src, dest []byte) ([]byte, error) { return c.dec.DecodeAll(src, dest) } + +// DecompressedSize returns the size of the decompressed data. +func (c *Compressor) DecompressedSize(src []byte) (int64, error) { + if len(src) == 0 { + return 0, nil + } + + var header zstd.Header + + if err := header.Decode(src); err != nil { + return 0, err + } + + if header.HasFCS { + return int64(header.FrameContentSize), nil + } + + return 0, errors.New("frame content size is not set") +} diff --git a/zstd/zstd_test.go b/zstd/zstd_test.go new file mode 100644 index 0000000..2a7d292 --- /dev/null +++ b/zstd/zstd_test.go @@ -0,0 +1,61 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package zstd_test + +import ( + "crypto/rand" + "io" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/siderolabs/go-circular/zstd" +) + +func TestCompressor(t *testing.T) { + t.Parallel() + + compressor, err := zstd.NewCompressor() + require.NoError(t, err) + + for _, test := range []struct { + size int + }{ + { + size: 0, + }, + { + size: 1024, + }, + { + size: 1024 * 1024, + }, + } { + t.Run(strconv.Itoa(test.size), func(t *testing.T) { + t.Parallel() + + data, err := io.ReadAll(io.LimitReader(rand.Reader, int64(test.size))) + require.NoError(t, err) + + compressed, err := compressor.Compress(data, nil) + require.NoError(t, err) + + decompressed, err := compressor.Decompress(compressed, nil) + require.NoError(t, err) + + if len(data) == 0 { + data = nil + } + + require.Equal(t, data, decompressed) + + decompressedSize, err := compressor.DecompressedSize(compressed) + require.NoError(t, err) + + require.Equal(t, int64(len(data)), decompressedSize) + }) + } +}