From 55734a3652241f1505c1e57c998d2d203833bdd6 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Tue, 14 May 2024 16:37:40 +0400 Subject: [PATCH] feat: add persistence support Add support to persist and load buffer from disk. Co-authored-by: Utku Ozdemir Signed-off-by: Andrey Smirnov --- .dockerignore | 6 +- .golangci.yml | 4 +- Dockerfile | 6 +- chunk.go | 3 +- circular.go | 63 +++++++++ circular_bench_test.go | 3 +- circular_test.go | 1 - go.mod | 3 + go.sum | 13 +- options.go | 71 +++++++++- options_test.go | 35 +++++ persistence.go | 301 +++++++++++++++++++++++++++++++++++++++++ persistence_test.go | 298 ++++++++++++++++++++++++++++++++++++++++ zstd/zstd.go | 21 +++ zstd/zstd_test.go | 61 +++++++++ 15 files changed, 879 insertions(+), 10 deletions(-) create mode 100644 options_test.go create mode 100644 persistence.go create mode 100644 persistence_test.go create mode 100644 zstd/zstd_test.go diff --git a/.dockerignore b/.dockerignore index f2ec6e0..41deeae 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,14 +1,18 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-05-08T13:07:24Z by kres 1e986af. +# Generated on 2024-05-16T11:12:28Z by kres ce88e1c. * !zstd !chunk.go !circular.go +!circular_bench_test.go !circular_test.go !errors.go !options.go +!options_test.go +!persistence.go +!persistence_test.go !reader.go !go.mod !go.sum diff --git a/.golangci.yml b/.golangci.yml index a30d5e8..76582ca 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-05-08T11:26:09Z by kres 1e986af. +# Generated on 2024-05-16T11:12:28Z by kres ce88e1c. # options for analysis running run: @@ -35,7 +35,7 @@ linters-settings: sections: - standard # Standard section: captures all standard packages. - default # Default section: contains all imports that could not be matched to another section type. - - prefix(github.com/siderolabs/go-circular/) # Custom section: groups all imports with the specified Prefix. + - localmodule gocognit: min-complexity: 30 nestif: diff --git a/Dockerfile b/Dockerfile index 078964b..901e07b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-05-08T13:07:24Z by kres 1e986af. +# Generated on 2024-05-16T11:12:28Z by kres ce88e1c. ARG TOOLCHAIN @@ -58,9 +58,13 @@ RUN --mount=type=cache,target=/go/pkg go mod verify COPY ./zstd ./zstd COPY ./chunk.go ./chunk.go COPY ./circular.go ./circular.go +COPY ./circular_bench_test.go ./circular_bench_test.go COPY ./circular_test.go ./circular_test.go COPY ./errors.go ./errors.go COPY ./options.go ./options.go +COPY ./options_test.go ./options_test.go +COPY ./persistence.go ./persistence.go +COPY ./persistence_test.go ./persistence_test.go COPY ./reader.go ./reader.go RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null diff --git a/chunk.go b/chunk.go index 5bd966a..2852743 100644 --- a/chunk.go +++ b/chunk.go @@ -11,5 +11,6 @@ type chunk struct { startOffset int64 // uncompressed size of the chunk size int64 - // [TODO]: have a unique (incrementing?) chunk ID for file-based storage + // unique chunk ID + id int64 } diff --git a/circular.go b/circular.go index 6cbceb3..df88ff6 100644 --- a/circular.go +++ b/circular.go @@ -10,6 +10,7 @@ import ( "math" "slices" "sync" + "sync/atomic" ) // Buffer implements circular buffer with a thread-safe writer, @@ -18,6 +19,9 @@ type Buffer struct { // waking up streaming readers on new writes cond *sync.Cond + // channel for persistence commands from the writer to the persistence goroutine + commandCh chan persistenceCommand + // compressed chunks, ordered from the smallest offset to the largest chunks []chunk @@ -28,6 +32,12 @@ type Buffer struct { // buffer options opt Options + // waitgroup to wait for persistence goroutine to finish + wg sync.WaitGroup + + // closed flag (to disable writes after close) + closed atomic.Bool + // synchronizing access to data, off, chunks mu sync.Mutex @@ -59,16 +69,43 @@ func NewBuffer(opts ...OptionFunc) (*Buffer, error) { buf.data = make([]byte, buf.opt.InitialCapacity) buf.cond = sync.NewCond(&buf.mu) + if err := buf.load(); err != nil { + return nil, err + } + + buf.run() + return buf, nil } +// Close closes the buffer and waits for persistence goroutine to finish. +func (buf *Buffer) Close() error { + if buf.closed.Swap(true) { + return nil + } + + if buf.commandCh != nil { + close(buf.commandCh) + } + + buf.wg.Wait() + + return nil +} + // Write implements io.Writer interface. +// +//nolint:gocognit func (buf *Buffer) Write(p []byte) (int, error) { l := len(p) if l == 0 { return 0, nil } + if buf.closed.Load() { + return 0, ErrClosed + } + buf.mu.Lock() defer buf.mu.Unlock() @@ -122,13 +159,34 @@ func (buf *Buffer) Write(p []byte) (int, error) { return n, err } + var maxID int64 + + for _, c := range buf.chunks { + maxID = max(c.id, maxID) + } + buf.chunks = append(buf.chunks, chunk{ compressed: compressed, startOffset: buf.off - int64(buf.opt.MaxCapacity), size: int64(buf.opt.MaxCapacity), + id: maxID + 1, }) + if buf.commandCh != nil { + buf.commandCh <- persistenceCommand{ + chunkID: maxID + 1, + data: compressed, + } + } + if len(buf.chunks) > buf.opt.NumCompressedChunks { + if buf.commandCh != nil { + buf.commandCh <- persistenceCommand{ + chunkID: buf.chunks[0].id, + drop: true, + } + } + buf.chunks = slices.Delete(buf.chunks, 0, 1) } } @@ -147,6 +205,11 @@ func (buf *Buffer) Capacity() int { return cap(buf.data) } +// MaxCapacity returns maximum number of (decompressed) bytes (including compressed chunks) that can be stored in the buffer. +func (buf *Buffer) MaxCapacity() int { + return buf.opt.MaxCapacity * (buf.opt.NumCompressedChunks + 1) +} + // NumCompressedChunks returns number of compressed chunks. func (buf *Buffer) NumCompressedChunks() int { buf.mu.Lock() diff --git a/circular_bench_test.go b/circular_bench_test.go index 9702408..e6e3a7b 100644 --- a/circular_bench_test.go +++ b/circular_bench_test.go @@ -2,9 +2,10 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +//go:build !race + package circular_test -//nolint:gci import ( "crypto/rand" "io" diff --git a/circular_test.go b/circular_test.go index dac913a..1bb0834 100644 --- a/circular_test.go +++ b/circular_test.go @@ -4,7 +4,6 @@ package circular_test -//nolint:gci import ( "bytes" "context" diff --git a/go.mod b/go.mod index 9f5a54f..04b4e22 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( github.com/klauspost/compress v1.17.8 github.com/siderolabs/gen v0.4.8 github.com/stretchr/testify v1.9.0 + go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 golang.org/x/sync v0.7.0 golang.org/x/time v0.5.0 ) @@ -13,5 +15,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.10.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index dc77d29..70dafe0 100644 --- a/go.sum +++ b/go.sum @@ -2,17 +2,28 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/siderolabs/gen v0.4.8 h1:VNpbmDLhkXp7qcSEkKk1Ee7vU2afs3xvHrWLGR2UuiY= github.com/siderolabs/gen v0.4.8/go.mod h1:7ROKVHHB68R3Amrd4a1ZXz/oMrXWF3Mg3lSEgnkJY5c= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go index 7adb283..978eb86 100644 --- a/options.go +++ b/options.go @@ -4,12 +4,22 @@ package circular -import "fmt" +import ( + "fmt" + "math/rand/v2" + "time" + + "go.uber.org/zap" +) // Options defines settings for Buffer. type Options struct { Compressor Compressor + Logger *zap.Logger + + PersistenceOptions PersistenceOptions + InitialCapacity int MaxCapacity int SafetyGap int @@ -17,12 +27,38 @@ type Options struct { NumCompressedChunks int } +// PersistenceOptions defines settings for Buffer persistence. +type PersistenceOptions struct { + // ChunkPath is the base path to the store chunk files. + // + // Example: /var/log/machine/my-machine.log, chunks will be stored + // by appending a chunk ID to this path, e.g. /var/log/machine/my-machine.log.3. + // + // If ChunkPath is empty, persistence is disabled. + ChunkPath string + + // FlushInterval flushes buffer content to disk every FlushInterval (if there were any changes). + FlushInterval time.Duration + + // FlushJitter adds random jitter to FlushInterval to avoid thundering herd problem (a ratio of FlushInterval). + FlushJitter float64 +} + +// NextInterval calculates next flush interval with jitter. +func (p PersistenceOptions) NextInterval() time.Duration { + return time.Duration(((rand.Float64()*2-1)*p.FlushJitter + 1.0) * float64(p.FlushInterval)) +} + // Compressor implements an optional interface for chunk compression. // // Compress and Decompress append to the dest slice and return the result. +// +// Compressor should be safe for concurrent use by multiple goroutines. +// Compressor should verify checksums of the compressed data. type Compressor interface { Compress(src, dest []byte) ([]byte, error) Decompress(src, dest []byte) ([]byte, error) + DecompressedSize(src []byte) (int64, error) } // defaultOptions returns default initial values. @@ -31,6 +67,7 @@ func defaultOptions() Options { InitialCapacity: 16384, MaxCapacity: 1048576, SafetyGap: 1024, + Logger: zap.NewNop(), } } @@ -71,7 +108,7 @@ func WithMaxCapacity(capacity int) OptionFunc { func WithSafetyGap(gap int) OptionFunc { return func(opt *Options) error { if gap <= 0 { - return fmt.Errorf("safety gap should be positive: %q", gap) + return fmt.Errorf("safety gap should be positive: %d", gap) } opt.SafetyGap = gap @@ -95,3 +132,33 @@ func WithNumCompressedChunks(num int, c Compressor) OptionFunc { return nil } } + +// WithPersistence enables buffer persistence to disk. +func WithPersistence(options PersistenceOptions) OptionFunc { + return func(opt *Options) error { + if options.ChunkPath == "" { + return fmt.Errorf("chunk path should be set") + } + + if options.FlushJitter < 0 || options.FlushJitter > 1 { + return fmt.Errorf("flush jitter should be in range [0, 1]: %f", options.FlushJitter) + } + + if opt.Compressor == nil { + return fmt.Errorf("compressor should be set for persistence") + } + + opt.PersistenceOptions = options + + return nil + } +} + +// WithLogger sets logger for Buffer. +func WithLogger(logger *zap.Logger) OptionFunc { + return func(opt *Options) error { + opt.Logger = logger + + return nil + } +} diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..6fee9a9 --- /dev/null +++ b/options_test.go @@ -0,0 +1,35 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/siderolabs/go-circular" +) + +func TestNextInterval(t *testing.T) { + t.Parallel() + + opts := circular.PersistenceOptions{ + FlushInterval: 10 * time.Second, + FlushJitter: 0.1, + } + + var previous time.Duration + + for range 100 { + interval := opts.NextInterval() + + assert.NotEqual(t, previous, interval) + + previous = interval + + assert.InDelta(t, opts.FlushInterval, interval, 0.1*float64(opts.FlushInterval)) + } +} diff --git a/persistence.go b/persistence.go new file mode 100644 index 0000000..3c7eba8 --- /dev/null +++ b/persistence.go @@ -0,0 +1,301 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular + +import ( + "cmp" + "fmt" + "io/fs" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + "time" + + "github.com/siderolabs/gen/xslices" + "go.uber.org/zap" +) + +//nolint:gocognit +func (buf *Buffer) load() error { + if buf.opt.PersistenceOptions.ChunkPath == "" { + // persistence is disabled + return nil + } + + chunkPaths, err := filepath.Glob(buf.opt.PersistenceOptions.ChunkPath + ".*") + if err != nil { + return err + } + + type parsedChunkPath struct { + path string + id int64 + } + + parsedChunkPaths := make([]parsedChunkPath, 0, len(chunkPaths)) + + for _, chunkPath := range chunkPaths { + idx := strings.LastIndexByte(chunkPath, '.') + if idx == -1 { + continue + } + + idStr := chunkPath[idx+1:] + + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id < 0 { + continue + } + + parsedChunkPaths = append(parsedChunkPaths, parsedChunkPath{ + id: id, + path: chunkPath, + }) + } + + // sort chunks by ID, from smallest to biggest + slices.SortFunc(parsedChunkPaths, func(a, b parsedChunkPath) int { + return cmp.Compare(a.id, b.id) + }) + + idx := 0 + + if len(parsedChunkPaths) > 1 && parsedChunkPaths[0].id == 0 { + idx = 1 + } + + if len(parsedChunkPaths)-idx > buf.opt.NumCompressedChunks { + for j := idx; j < len(parsedChunkPaths)-buf.opt.NumCompressedChunks; j++ { + buf.opt.Logger.Warn("dropping chunk, as it is beyond the limit of available chunks", zap.String("path", parsedChunkPaths[j].path)) + + if err := os.Remove(parsedChunkPaths[j].path); err != nil { + buf.opt.Logger.Error("failed to remove chunk", zap.String("path", parsedChunkPaths[j].path), zap.Error(err)) + + continue + } + } + + parsedChunkPaths = slices.Delete(parsedChunkPaths, idx, len(parsedChunkPaths)-buf.opt.NumCompressedChunks) + } + + chunks := make([]chunk, 0, len(parsedChunkPaths)) + + for _, chunkPath := range parsedChunkPaths { + data, err := os.ReadFile(chunkPath.path) + if err != nil { + buf.opt.Logger.Error("failed to read chunk, skipping", zap.String("path", chunkPath.path), zap.Error(err)) + + continue + } + + if chunkPath.id == 0 { + buf.data, err = buf.opt.Compressor.Decompress(data, buf.data[:0]) + if err != nil { + buf.opt.Logger.Error("failed to decompress zero chunk, skipping", zap.String("path", chunkPath.path), zap.Error(err)) + + buf.data = buf.data[:cap(buf.data)] + + continue + } + + buf.off = int64(len(buf.data)) + + if cap(buf.data) > buf.opt.MaxCapacity { + buf.data = buf.data[:buf.opt.MaxCapacity:buf.opt.MaxCapacity] + } else { + buf.data = buf.data[:cap(buf.data)] + } + } else { + decompressedSize, err := buf.opt.Compressor.DecompressedSize(data) + if err != nil { + buf.opt.Logger.Error("failed to get size of compressed chunk, skipping", zap.String("path", chunkPath.path), zap.Error(err)) + + continue + } + + chunks = append(chunks, + chunk{ + compressed: data, + id: chunkPath.id, + size: decompressedSize, + }) + } + } + + // re-calculate all offsets + var sizeDecompressed int64 + + for i := range chunks { + sizeDecompressed += chunks[i].size + } + + buf.opt.Logger.Debug("loaded buffer from disk", + zap.Int("num_compressed_chunks", len(chunks)), + zap.Int64("current_chunk_bytes", buf.off), + zap.Int64("overall_decompressed_bytes", sizeDecompressed+buf.off), + zap.Strings("chunk_paths", xslices.Map(parsedChunkPaths, func(c parsedChunkPath) string { + return c.path + })), + ) + + // if chunk sizes are [10, 30, 20], the offsets will be [-60, -50, -20]. + // the current buffer starts at 0 and goes to b.off (size of the buffer). + for i := range chunks { + chunks[i].startOffset = -sizeDecompressed + sizeDecompressed -= chunks[i].size + } + + buf.chunks = chunks + + return nil +} + +func (buf *Buffer) run() { + if buf.opt.PersistenceOptions.ChunkPath == "" { + // persistence is disabled + return + } + + buf.commandCh = make(chan persistenceCommand, 8) + startOffset := buf.off + + buf.wg.Add(1) + + go func() { + defer buf.wg.Done() + buf.runPersistence(buf.commandCh, startOffset) + }() +} + +type persistenceCommand struct { + data []byte + + chunkID int64 + drop bool +} + +func (buf *Buffer) chunkPath(chunkID int64) string { + return buf.opt.PersistenceOptions.ChunkPath + "." + strconv.FormatInt(chunkID, 10) +} + +//nolint:gocognit +func (buf *Buffer) runPersistence(ch <-chan persistenceCommand, lastPersistedOffset int64) { + var ( + timerC <-chan time.Time + timer *time.Timer + ) + + defer func() { + if timer == nil { + return + } + + if !timer.Stop() { + <-timer.C + } + }() + + setTimer := func() { + interval := buf.opt.PersistenceOptions.NextInterval() + + if timer == nil { + timer = time.NewTimer(interval) + timerC = timer.C + } else { + timer.Reset(interval) + } + } + + if buf.opt.PersistenceOptions.FlushInterval > 0 { + setTimer() + } + + chunkPath0 := buf.chunkPath(0) + +persistLoop: + for { + select { + case command, ok := <-ch: + if !ok { + break persistLoop + } + + chunkPath := buf.chunkPath(command.chunkID) + + if command.drop { + if err := os.Remove(chunkPath); err != nil { + buf.opt.Logger.Error("failed to remove chunk", zap.String("path", chunkPath), zap.Error(err)) + } else { + buf.opt.Logger.Debug("dropped old chunk", zap.String("path", chunkPath)) + } + } else { + if err := atomicWriteFile(chunkPath, command.data, 0o644); err != nil { + buf.opt.Logger.Error("failed to write compressed chunk", zap.String("path", chunkPath), zap.Error(err)) + } else { + buf.opt.Logger.Debug("persisted rotated chunk", zap.String("path", chunkPath)) + } + } + case <-timerC: + // persist current chunk if changed + currentOffset := buf.Offset() + + if persisted, err := buf.persistCurrentChunk(currentOffset, lastPersistedOffset, chunkPath0); err != nil { + buf.opt.Logger.Error("failed to persist current chunk on timer", zap.Error(err), zap.String("path", chunkPath0)) + } else if persisted { + lastPersistedOffset = currentOffset + + buf.opt.Logger.Debug("persisted current chunk on timer", zap.Int64("offset", currentOffset), zap.String("path", chunkPath0)) + } + + setTimer() + } + } + + // command channel is closed, persist the current chunk + if persisted, err := buf.persistCurrentChunk(buf.Offset(), lastPersistedOffset, chunkPath0); err != nil { + buf.opt.Logger.Error("failed to persist current chunk on close", zap.Error(err), zap.String("path", chunkPath0)) + } else if persisted { + buf.opt.Logger.Debug("persisted current chunk on close", zap.Int64("offset", buf.Offset()), zap.String("path", chunkPath0)) + } +} + +func (buf *Buffer) persistCurrentChunk(currentOffset, lastPersistedOffset int64, chunkPath0 string) (persisted bool, err error) { + if currentOffset == lastPersistedOffset { + return false, nil + } + + buf.mu.Lock() + data := slices.Clone(buf.data[:currentOffset%int64(cap(buf.data))]) + buf.mu.Unlock() + + compressed, err := buf.opt.Compressor.Compress(data, nil) + if err != nil { + return false, fmt.Errorf("failed to compress current chunk: %w", err) + } + + if err = atomicWriteFile(chunkPath0, compressed, 0o644); err != nil { + return false, fmt.Errorf("failed to write compressed chunk: %w", err) + } + + return true, nil +} + +func atomicWriteFile(path string, data []byte, mode fs.FileMode) error { + tmpPath := path + ".tmp" + + if err := os.WriteFile(tmpPath, data, mode); err != nil { + return fmt.Errorf("failed to write temporary file: %w", err) + } + + if err := os.Rename(tmpPath, path); err != nil { + os.Remove(tmpPath) //nolint:errcheck + + return fmt.Errorf("failed to rename temporary file: %w", err) + } + + return nil +} diff --git a/persistence_test.go b/persistence_test.go new file mode 100644 index 0000000..8e68bb9 --- /dev/null +++ b/persistence_test.go @@ -0,0 +1,298 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular_test + +import ( + "bytes" + "io" + "io/fs" + "os" + "path/filepath" + "slices" + "strconv" + "testing" + "time" + + "github.com/siderolabs/gen/xtesting/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/zap/zaptest" + + "github.com/siderolabs/go-circular" + "github.com/siderolabs/go-circular/zstd" +) + +func TestLoad(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + + options []circular.OptionFunc + + chunkSizes []int + + numCompressedChunks int + }{ + { + name: "no chunks", + + numCompressedChunks: 5, + }, + { + name: "expected number of chunks", + + numCompressedChunks: 5, + + chunkSizes: []int{1024, 2048, 1024, 1024, 3072, 4096}, + }, + { + name: "less chunks than expected", + + numCompressedChunks: 5, + + chunkSizes: []int{10240, 20480}, + }, + { + name: "more chunks than expected", + + numCompressedChunks: 4, + + chunkSizes: []int{10240, 2048, 1024, 1024, 3072, 4096}, + }, + { + name: "single chunk", + + numCompressedChunks: 2, + + chunkSizes: []int{10240}, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + logger := zaptest.NewLogger(t) + + compressor := must.Value(zstd.NewCompressor())(t) + + for i, size := range test.chunkSizes { + chunkData := bytes.Repeat([]byte{byte(i)}, size) + chunkData = must.Value(compressor.Compress(chunkData, nil))(t) + + chunkPath := filepath.Join(dir, "chunk."+strconv.Itoa(i)) + + require.NoError(t, os.WriteFile(chunkPath, chunkData, 0o644)) + } + + buf, err := circular.NewBuffer(append(test.options, + circular.WithNumCompressedChunks(test.numCompressedChunks, compressor), + circular.WithPersistence(circular.PersistenceOptions{ + ChunkPath: filepath.Join(dir, "chunk"), + }), + circular.WithLogger(logger), + )...) + require.NoError(t, err) + + actualData, err := io.ReadAll(buf.GetReader()) + require.NoError(t, err) + + var expectedData []byte + + firstIdx := 1 + + if len(test.chunkSizes) > test.numCompressedChunks+1 { + firstIdx = len(test.chunkSizes) - test.numCompressedChunks + } + + for i := firstIdx; i < firstIdx-1+min(len(test.chunkSizes), test.numCompressedChunks+1); i++ { + expectedData = append(expectedData, bytes.Repeat([]byte{byte(i)}, test.chunkSizes[i])...) + } + + if len(test.chunkSizes) > 0 { + expectedData = append(expectedData, bytes.Repeat([]byte{0}, test.chunkSizes[0])...) + } + + if expectedData == nil { + expectedData = []byte{} + } + + require.Equal(t, expectedData, actualData) + + require.NoError(t, buf.Close()) + }) + } +} + +func TestPersist(t *testing.T) { + t.Parallel() + + for _, test := range []struct { //nolint:govet + name string + + numCompressedChunks int + + options []circular.OptionFunc + + sizes []int64 + }{ + { + name: "empty", + + numCompressedChunks: 5, + + sizes: []int64{0, 0}, + }, + { + name: "some data", + + numCompressedChunks: 5, + + sizes: []int64{2048, 2048 * 2048, 131072}, + }, + { + name: "writes close to max capacity", + + numCompressedChunks: 6, + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(128), + circular.WithMaxCapacity(1024), + circular.WithSafetyGap(4), + }, + sizes: []int64{1019, 1019}, + }, + { + name: "uneven writes", + + numCompressedChunks: 6, + + sizes: []int64{1024*1024 + 1, 1024*1024 - 1, 1024*1024 - 1, 1024*1024 - 1, 1024*1024 - 1, 1024*1024 - 1}, + }, + { + name: "dropping old chunks", + + numCompressedChunks: 3, + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(128), + circular.WithMaxCapacity(1024), + circular.WithSafetyGap(1), + }, + sizes: []int64{2048, 2048 + 256, 1024, 1024}, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + logger := zaptest.NewLogger(t) + + compressor := must.Value(zstd.NewCompressor())(t) + + expectedContents := []byte{} + + for _, size := range append(test.sizes, 0) { // appending zero to run one more iteration verifying previous write + data := make([]byte, size) + + for i := range data { + data[i] = byte(i % 256) + } + + buf, err := circular.NewBuffer(append(test.options, + circular.WithNumCompressedChunks(test.numCompressedChunks, compressor), + circular.WithPersistence(circular.PersistenceOptions{ + ChunkPath: filepath.Join(dir, "chunk"), + }), + circular.WithLogger(logger), + )...) + require.NoError(t, err) + + bufferContentsAfterLoad, err := io.ReadAll(buf.GetReader()) + require.NoError(t, err) + + require.Equal(t, expectedContents, bufferContentsAfterLoad) + + _, err = buf.Write(data) + require.NoError(t, err) + + expectedContents = append(expectedContents, data...) + + for len(expectedContents) > buf.MaxCapacity() { + expectedContents = slices.Delete(expectedContents, 0, buf.Capacity()) + } + + require.NoError(t, buf.Close()) + } + }) + } +} + +func TestFlush(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + logger := zaptest.NewLogger(t) + + compressor := must.Value(zstd.NewCompressor())(t) + + buf, err := circular.NewBuffer( + circular.WithNumCompressedChunks(5, compressor), + circular.WithPersistence(circular.PersistenceOptions{ + ChunkPath: filepath.Join(dir, "chunk"), + FlushInterval: 10 * time.Millisecond, + }), + circular.WithLogger(logger), + ) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, buf.Close()) + }) + + _, err = buf.Write([]byte("hello")) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.FileExists(t, filepath.Join(dir, "chunk.0")) + }, time.Second, 10*time.Millisecond) + + st1, err := os.Stat(filepath.Join(dir, "chunk.0")) + require.NoError(t, err) + + _, err = buf.Write([]byte("world")) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + var st2 fs.FileInfo + + st2, err = os.Stat(filepath.Join(dir, "chunk.0")) + require.NoError(t, err) + + assert.Greater(t, st2.Size(), st1.Size()) + }, time.Second, 10*time.Millisecond) + + // without closing buf, re-open it and check if data is still there + buf2, err := circular.NewBuffer( + circular.WithNumCompressedChunks(5, compressor), + circular.WithPersistence(circular.PersistenceOptions{ + ChunkPath: filepath.Join(dir, "chunk"), + }), + circular.WithLogger(logger), + ) + require.NoError(t, err) + + actualData, err := io.ReadAll(buf2.GetReader()) + require.NoError(t, err) + + require.Equal(t, []byte("helloworld"), actualData) + + require.NoError(t, buf2.Close()) +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/zstd/zstd.go b/zstd/zstd.go index 08c5bda..8a6b1b1 100644 --- a/zstd/zstd.go +++ b/zstd/zstd.go @@ -6,6 +6,8 @@ package zstd import ( + "errors" + "github.com/klauspost/compress/zstd" ) @@ -42,3 +44,22 @@ func (c *Compressor) Compress(src, dest []byte) ([]byte, error) { func (c *Compressor) Decompress(src, dest []byte) ([]byte, error) { return c.dec.DecodeAll(src, dest) } + +// DecompressedSize returns the size of the decompressed data. +func (c *Compressor) DecompressedSize(src []byte) (int64, error) { + if len(src) == 0 { + return 0, nil + } + + var header zstd.Header + + if err := header.Decode(src); err != nil { + return 0, err + } + + if header.HasFCS { + return int64(header.FrameContentSize), nil + } + + return 0, errors.New("frame content size is not set") +} diff --git a/zstd/zstd_test.go b/zstd/zstd_test.go new file mode 100644 index 0000000..2a7d292 --- /dev/null +++ b/zstd/zstd_test.go @@ -0,0 +1,61 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package zstd_test + +import ( + "crypto/rand" + "io" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/siderolabs/go-circular/zstd" +) + +func TestCompressor(t *testing.T) { + t.Parallel() + + compressor, err := zstd.NewCompressor() + require.NoError(t, err) + + for _, test := range []struct { + size int + }{ + { + size: 0, + }, + { + size: 1024, + }, + { + size: 1024 * 1024, + }, + } { + t.Run(strconv.Itoa(test.size), func(t *testing.T) { + t.Parallel() + + data, err := io.ReadAll(io.LimitReader(rand.Reader, int64(test.size))) + require.NoError(t, err) + + compressed, err := compressor.Compress(data, nil) + require.NoError(t, err) + + decompressed, err := compressor.Decompress(compressed, nil) + require.NoError(t, err) + + if len(data) == 0 { + data = nil + } + + require.Equal(t, data, decompressed) + + decompressedSize, err := compressor.DecompressedSize(compressed) + require.NoError(t, err) + + require.Equal(t, int64(len(data)), decompressedSize) + }) + } +}