Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache persistence to disk #778

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 226 additions & 11 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ limitations under the License.
package cache

import (
"bufio"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"slices"
"sort"
"sync"
Expand Down Expand Up @@ -57,20 +63,24 @@ type cache[T any] struct {
index map[string]*item[T]
// items is the store of elements in the cache.
items []*item[T]

// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
snapshotPath string
buf buffer
// sorted indicates whether the items are sorted by expiration time.
// It is initially true, and set to false when the items are not sorted.
sorted bool
// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}
var _ Persistable = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
Expand All @@ -80,11 +90,12 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
}

c := &cache[T]{
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
labelsFunc: opt.labelsFunc,
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
snapshotPath: opt.snapshotPath,
labelsFunc: opt.labelsFunc,
janitor: &janitor[T]{
interval: opt.interval,
stop: make(chan bool),
Expand All @@ -97,6 +108,16 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]

C := &Cache[T]{cache: c, keyFunc: keyFunc}

if c.snapshotPath != "" {
// load the cache from the file if it exists
if _, err := os.Stat(c.snapshotPath); err == nil {
err = c.load()
if err != nil {
return nil, err
}
}
}

if opt.interval > 0 {
go c.janitor.run(c)
}
Expand Down Expand Up @@ -498,3 +519,197 @@ func (j *janitor[T]) run(c *cache[T]) {
}
}
}

// buffer is a helper type used to write data to a byte slice
type buffer []byte

// clear clears the buffer
func (s *buffer) clear() {
*s = (*s)[:0]
}

// writeByteSlice writes a byte slice to the buffer
func (s *buffer) writeByteSlice(v []byte) {
*s = append(*s, v...)
}

// writeUint64 writes a uint64 to the buffer
// it is written in little endian format
func (s *buffer) writeUint64(v uint64) {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], v)
*s = append(*s, buf[:]...)
}

// writeBuf writes the buffer to the file
func (c *cache[T]) writeBuf(file *os.File) error {
if _, err := file.Write(c.buf); err != nil {
return err
}
// sync the file to disk straight away
file.Sync()
return nil
}

// Persist writes the cache to disk
// The cache is written to a temporary file first
// and then renamed to the final file name to atomically
// update the cache file. This is done to avoid corrupting
// the cache file in case of a crash while writing to the file. If a file
// with the same name exists, it is overwritten.
// The cache file is written in the following format:
// key length, key, expiration, data length, data // repeat for each item
// The key length and data length are written as uint64 in little endian format
// The expiration is written as a unix timestamp in seconds as uint64 in little endian format
// The key is written as a byte slice
// The data is written as a json encoded byte slice
func (c *cache[T]) Persist() error {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.writeToBuf(); err != nil {
return err
}

// create new temp file
newFile, err := os.Create(fmt.Sprintf("%s.tmp", c.snapshotPath))
if err != nil {
errf := os.Remove(fmt.Sprintf("%s.tmp", c.snapshotPath))
return errors.Join(err, errf)
}

if err := c.writeBuf(newFile); err != nil {
errf := os.Remove(fmt.Sprintf("%s.tmp", c.snapshotPath))
return errors.Join(err, errf)
}

// close the file
if err := newFile.Close(); err != nil {
errf := os.Remove(fmt.Sprintf("%s.tmp", c.snapshotPath))
return errors.Join(err, errf)
}

if err := os.Rename(fmt.Sprintf("%s.tmp", c.snapshotPath), c.snapshotPath); err != nil {
return fmt.Errorf("failed to rename file: %w", err)
}

return nil
}

// writeToBuf writes the cache to the buffer
// no locks are taken, the caller should ensure that
// the cache is not being modified while this function is called.
func (c *cache[T]) writeToBuf() error {
Copy link
Contributor

@darkowlzz darkowlzz Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the cached data are of known type and in Go, can't we just use gob to achieve the same, that is to serialize the cache items and write to a buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To have better control. I am open to change this. My reasoning is that if we ever want to offer different durability level for the cache (mem vs on disk), we will need to deal with the underlying persistence layer on an per-item basis in order to be able to every change. We may never need it, but it's possible with this implementation.

Copy link
Contributor

@darkowlzz darkowlzz Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gob API deals with serialization at per object level. Looks similar to what the current code does, just the serialization of each cache item comes for free. How the serialized data is organized later and loaded is up to the higher level implementation. Gob just provides a nice encoder and decoder for simple serialization of Go objects.

c.buf.clear()
for _, item := range c.items {
data, err := json.Marshal(item.object)
if err != nil {
return err
}

// write the key, expiration and data to the buffer
// format: key length, key, expiration, data length, data
// doing this this way, gives us the ability to read the file
// without having to read the entire file into memory. This is
// done for possible future use cases e.g. where the cache file
// could be very large or for range queries.
c.buf.writeUint64(uint64(len(item.key)))
c.buf.writeByteSlice([]byte(item.key))
// we write the expiration time in nanoseconds as uint64
// instead of using item.expiresAt.MarshalBinary() because we are only
// interested in the nano second precision Unix time,
// everything else can be discarded.
c.buf.writeUint64(uint64(item.expiresAt.UnixNano()))
c.buf.writeUint64(uint64(len(data)))
c.buf.writeByteSlice(data)
}
return nil
}

// load reads the cache from disk
// The cache file is read in the following format:
// key length, key, expiration, data length, data // repeat for each item
// This function cannot be called concurrently, and should be called
// before the cache is used.
func (c *cache[T]) load() error {
file, err := os.Open(c.snapshotPath)
if err != nil {
return err
}
defer file.Close()

rd := bufio.NewReader(file)
items, err := c.readFrom(rd)
if err != nil {
return err
}

for _, item := range items {
if len(c.items) >= c.capacity {
break
}
c.items = append(c.items, item)
c.index[item.key] = item
}

if len(c.items) > 0 {
c.metrics.setCachedItems(float64(len(c.items)))
c.sorted = false
}
return nil
}

func (c *cache[T]) readFrom(rd io.Reader) ([]*item[T], error) {
items := make([]*item[T], 0)
for {
// read until EOF
item, err := c.readItem(rd)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
items = append(items, item)
}
return items, nil
}

func (c *cache[T]) readItem(rd io.Reader) (*item[T], error) {
var (
buf = make([]byte, 8)
item item[T]
)
if _, err := io.ReadFull(rd, buf); err != nil {
if err == io.EOF {
return nil, err
}
return nil, err
}
keyLen := binary.LittleEndian.Uint64(buf)
key := make([]byte, keyLen)
if _, err := io.ReadFull(rd, key); err != nil {
return nil, err
}
item.key = string(key)

if _, err := io.ReadFull(rd, buf); err != nil {
return nil, err
}
item.expiresAt = time.Unix(int64(binary.LittleEndian.Uint64(buf)), 0)

if _, err := io.ReadFull(rd, buf); err != nil {
return nil, err
}
dataLen := binary.LittleEndian.Uint64(buf)
data := make([]byte, dataLen)
if _, err := io.ReadFull(rd, data); err != nil {
return nil, err
}

if err := json.Unmarshal(data, &item.object); err != nil {
return nil, err
}

return &item, nil
}
Loading
Loading