Skip to content

Commit

Permalink
feat: add persistence support
Browse files Browse the repository at this point in the history
Add support to persist and load buffer from disk.

Co-authored-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira and utkuozdemir committed May 14, 2024
1 parent 3c48c53 commit a874ed6
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 2 deletions.
3 changes: 2 additions & 1 deletion chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ type chunk struct {
startOffset int64
// uncompressed size of the chunk
size int64
// [TODO]: have a unique (incrementing?) chunk ID for file-based storage
// unique chunk ID
id int64
}
15 changes: 15 additions & 0 deletions circular.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"sync"
)

/// Buffer ---> (( new chunk X + any V, delete chunk Y + any V )) ---> Persister

// Close(): ^^^ stop this channel ; return any chunks to persist?

// Buffer implements circular buffer with a thread-safe writer,
// that supports multiple readers each with its own offset.
type Buffer struct {
Expand Down Expand Up @@ -59,6 +63,10 @@ func NewBuffer(opts ...OptionFunc) (*Buffer, error) {
buf.data = make([]byte, buf.opt.InitialCapacity)
buf.cond = sync.NewCond(&buf.mu)

if err := buf.load(); err != nil {
return nil, err
}

return buf, nil
}

Expand Down Expand Up @@ -122,10 +130,17 @@ func (buf *Buffer) Write(p []byte) (int, error) {
return n, err
}

var maxID int64

for _, c := range buf.chunks {
maxID = max(c.id, maxID)
}

buf.chunks = append(buf.chunks, chunk{
compressed: compressed,
startOffset: buf.off - int64(buf.opt.MaxCapacity),
size: int64(buf.opt.MaxCapacity),
id: maxID + 1,
})

if len(buf.chunks) > buf.opt.NumCompressedChunks {
Expand Down
49 changes: 48 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,51 @@

package circular

import "fmt"
import (
"fmt"
"time"
)

// Options defines settings for Buffer.
type Options struct {
Compressor Compressor

PersistenceOptions PersistenceOptions

InitialCapacity int
MaxCapacity int
SafetyGap int

NumCompressedChunks int
}

// PersistenceOptions defines settings for Buffer persistence.
type PersistenceOptions struct {
// ChunkPath is the base path to the store chunk files.
//
// Example: /var/log/machine/my-machine.log, chunks will be stored
// by appending a chunk ID to this path, e.g. /var/log/machine/my-machine.log.3.
//
// If ChunkPath is empty, persistence is disabled.
ChunkPath string

// FlushInterval flushes buffer content to disk every FlushInterval (if there were any changes).
FlushInterval time.Duration

// FlushJitter adds random jitter to FlushInterval to avoid thundering herd problem (a ratio of FlushInterval).
FlushJitter float64
}

// Compressor implements an optional interface for chunk compression.
//
// Compress and Decompress append to the dest slice and return the result.
//
// Compressor should be safe for concurrent use by multiple goroutines.
// Compressor should verify checksums of the compressed data.
type Compressor interface {
Compress(src, dest []byte) ([]byte, error)
Decompress(src, dest []byte) ([]byte, error)
DecompressedSize(src []byte) (int64, error)
}

// defaultOptions returns default initial values.
Expand Down Expand Up @@ -95,3 +121,24 @@ func WithNumCompressedChunks(num int, c Compressor) OptionFunc {
return nil
}
}

// WithPersistence enables buffer persistence to disk.
func WithPersistence(options PersistenceOptions) OptionFunc {
return func(opt *Options) error {
if options.ChunkPath == "" {
return fmt.Errorf("chunk path should be set")
}

if options.FlushJitter < 0 || options.FlushJitter > 1 {
return fmt.Errorf("flush jitter should be in range [0, 1]: %f", options.FlushJitter)
}

if opt.Compressor == nil {
return fmt.Errorf("compressor should be set for persistence")
}

opt.PersistenceOptions = options

return nil
}
}
203 changes: 203 additions & 0 deletions persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package circular

import (
"cmp"
"context"
"math/rand"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
)

func (buf *Buffer) load() error {
if buf.opt.PersistenceOptions.ChunkPath == "" {
// persistence is disabled
return nil
}

chunkPaths, err := filepath.Glob(buf.opt.PersistenceOptions.ChunkPath + ".*")
if err != nil {
return err
}

type parsedChunkPath struct {
path string
id int64
}

parsedChunkPaths := make([]parsedChunkPath, 0, len(chunkPaths))

for _, chunkPath := range chunkPaths {
idx := strings.LastIndexByte(chunkPath, '.')
if idx == -1 {
continue
}

idStr := chunkPath[idx+1:]

id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id < 0 {
continue
}

parsedChunkPaths = append(parsedChunkPaths, parsedChunkPath{
id: id,
path: chunkPath,
})
}

// sort chunks by ID, from smallest to biggest
slices.SortFunc(parsedChunkPaths, func(a, b parsedChunkPath) int {
return cmp.Compare(a.id, b.id)
})

idx := 0

if len(parsedChunkPaths) > 1 && parsedChunkPaths[0].id == 0 {
idx = 1
}

if len(parsedChunkPaths)-idx > buf.opt.NumCompressedChunks {
for j := idx; j < len(parsedChunkPaths)-buf.opt.NumCompressedChunks; j++ {
if err := os.Remove(parsedChunkPaths[j].path); err != nil {
// failed to remove the chunk
continue
}
}

parsedChunkPaths = slices.Delete(parsedChunkPaths, idx, len(parsedChunkPaths)-buf.opt.NumCompressedChunks)
}

chunks := make([]chunk, 0, len(parsedChunkPaths))

for _, chunkPath := range parsedChunkPaths {
data, err := os.ReadFile(chunkPath.path)
if err != nil {
// failed to read the chunk
continue
}

if chunkPath.id == 0 {
buf.data, err = buf.opt.Compressor.Decompress(data, buf.data[:0])
if err != nil {
// failed to decompress the data
buf.data = buf.data[:cap(buf.data)]

continue
}

buf.off = int64(len(buf.data))
buf.data = buf.data[:cap(buf.data)]
} else {
decompressedSize, err := buf.opt.Compressor.DecompressedSize(data)
if err != nil {
// failed to get the decompressed size
continue
}

chunks = append(chunks,
chunk{
compressed: data,
id: chunkPath.id,
size: decompressedSize,
})
}
}

// re-calculate all offsets
var sizeCompressed int64

for i := range chunks {
sizeCompressed += chunks[i].size
}

// if chunk sizes are [10, 30, 20], the offsets will be [-60, -50, -20].
// the current buffer starts at 0 and goes to b.off (size of the buffer).
for i := range chunks {
chunks[i].startOffset = -sizeCompressed
sizeCompressed -= chunks[i].size
}

buf.chunks = chunks

return nil
}

type persistenceCommand struct {
chunkID int64
drop bool

data []byte
}

func (buf *Buffer) chunkPath(chunkID int64) string {
return buf.opt.PersistenceOptions.ChunkPath + "." + strconv.FormatInt(chunkID, 10)
}

func (buf *Buffer) runPersistence(ctx context.Context, ch <-chan persistenceCommand) {
var (
timerC <-chan time.Time
timer *time.Timer
)

defer func() {
if timer == nil {
return
}

if !timer.Stop() {
<-timer.C
}
}()

setTimer := func() {
interval := time.Duration(((rand.Float64()*2-1)*buf.opt.PersistenceOptions.FlushJitter + 1.0) * float64(buf.opt.PersistenceOptions.FlushInterval))

if timer == nil {
timer = time.NewTimer(interval)
timerC = timer.C
} else {
timer.Reset(interval)
}
}

if buf.opt.PersistenceOptions.FlushInterval > 0 {
setTimer()
}

for {
select {
case <-ctx.Done():
return
case command := <-ch:
if command.drop {
os.Remove(buf.chunkPath(command.chunkID))
} else {
os.WriteFile(buf.chunkPath(command.chunkID), command.data, 0o644)
}
case <-timerC:
// persist current chunk if changed

// if Changed
buf.mu.Lock()
data := slices.Clone(buf.data[:buf.off%int64(cap(buf.data))])
buf.mu.Unlock()

compressed, err := buf.opt.Compressor.Compress(data, nil)
if err != nil {
// WTF???
}

os.WriteFile(buf.chunkPath(0), compressed, 0o644)

setTimer()
}
}
}
Loading

0 comments on commit a874ed6

Please sign in to comment.