Skip to content

Commit

Permalink
chore: cachettl configuration option for not refreshing ttl (#643)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Sep 18, 2024
1 parent 319ed4c commit 1961d98
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 27 deletions.
38 changes: 24 additions & 14 deletions cachettl/cachettl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
// the tail node (end) is the node with the highest expiration time
// Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory.
type Cache[K comparable, V any] struct {
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]

config cacheConfig
onEvicted func(key K, value V)
}

Expand All @@ -32,24 +33,31 @@ func (n *node[K, V]) remove() {
}

// New returns a new Cache.
func New[K comparable, V any]() *Cache[K, V] {
return &Cache[K, V]{
now: time.Now,
func New[K comparable, V any](opts ...Opt) *Cache[K, V] {
c := &Cache[K, V]{
config: cacheConfig{
now: time.Now,
refreshTTL: true,
},
root: &node[K, V]{},
m: make(map[K]*node[K, V]),
}
for _, opt := range opts {
opt(&c.config)
}
return c
}

// Get returns the value associated with the key or nil otherwise.
// Additionally, Get() will refresh the TTL and cleanup expired nodes.
// Additionally, Get() will refresh the TTL by default and cleanup expired nodes.
func (c *Cache[K, V]) Get(key K) (zero V) {
c.mu.Lock()
defer c.mu.Unlock()

defer func() { // remove expired nodes
cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail
for cn != nil && cn != c.root {
if c.now().After(cn.expiration) {
if c.config.now().After(cn.expiration) {
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
if c.onEvicted != nil { // call the OnEvicted callback if it's set
Expand All @@ -62,10 +70,12 @@ func (c *Cache[K, V]) Get(key K) (zero V) {
}
}()

if n, ok := c.m[key]; ok && n.expiration.After(c.now()) {
n.remove()
n.expiration = c.now().Add(n.ttl) // refresh TTL
c.add(n)
if n, ok := c.m[key]; ok && n.expiration.After(c.config.now()) {
if c.config.refreshTTL {
n.remove()
n.expiration = c.config.now().Add(n.ttl) // refresh TTL
c.add(n)
}
return n.value
}
return zero
Expand All @@ -77,7 +87,7 @@ func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

now := c.now()
now := c.config.now()

n, ok := c.m[key]
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions cachettl/cachettl_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cachettl

import "time"

type Opt func(*cacheConfig)

// WithNoRefreshTTL disables the refresh of the TTL when the cache is accessed.
var WithNoRefreshTTL = func(c *cacheConfig) {
c.refreshTTL = false
}

// WithNow sets the function to use to get the current time.
var WithNow = func(now func() time.Time) Opt {
return func(c *cacheConfig) {
c.now = now
}
}

type cacheConfig struct {
now func() time.Time
refreshTTL bool
}
50 changes: 39 additions & 11 deletions cachettl/cachettl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
func TestCacheTTL(t *testing.T) {
now := time.Now()

c := New[string, string]()
c.now = func() time.Time { return now }
c := New[string, string](WithNow(func() time.Time { return now }))

// nothing done so far, we expect the cache to be empty
require.Nil(t, c.slice())
Expand All @@ -37,30 +36,30 @@ func TestCacheTTL(t *testing.T) {
require.Equal(t, []string{"222", "111", "333"}, c.slice())

// move time forward to expire "222"
c.now = func() time.Time { return now.Add(1) } // "222" should still be there
require.Empty(t, c.Get("whatever")) // trigger the cleanup
c.config.now = func() time.Time { return now.Add(1) } // "222" should still be there
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Equal(t, []string{"222", "111", "333"}, c.slice())

c.now = func() time.Time { return now.Add(2) } // "222" should still be there
require.Empty(t, c.Get("whatever")) // trigger the cleanup
c.config.now = func() time.Time { return now.Add(2) } // "222" should still be there
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Equal(t, []string{"222", "111", "333"}, c.slice())

c.now = func() time.Time { return now.Add(3) } // "222" should be expired!
require.Empty(t, c.Get("whatever")) // trigger the cleanup
c.config.now = func() time.Time { return now.Add(3) } // "222" should be expired!
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Equal(t, []string{"111", "333"}, c.slice())

// let's move a lot forward to expire everything
c.now = func() time.Time { return now.Add(6) }
c.config.now = func() time.Time { return now.Add(6) }
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Nil(t, c.slice())
require.Len(t, c.m, 0)

// now let's set a key, then move forward and get it directly without triggering with a different key
c.now = func() time.Time { return now }
c.config.now = func() time.Time { return now }
c.Put("last", "999", 1)
require.Equal(t, "999", c.Get("last"))
require.Equal(t, []string{"999"}, c.slice())
c.now = func() time.Time { return now.Add(2) }
c.config.now = func() time.Time { return now.Add(2) }
require.Empty(t, c.Get("last")) // trigger the cleanup
require.Nil(t, c.slice())
require.Len(t, c.m, 0)
Expand All @@ -86,3 +85,32 @@ func TestRefreshTTL(t *testing.T) {
require.Equal(t, "333", c.Get("three"))
require.Equal(t, []string{"111", "222", "333"}, c.slice())
}

func TestNoRefreshTTL(t *testing.T) {
now := time.Now()
c := New[string, string](WithNoRefreshTTL, WithNow(func() time.Time { return now }))

// nothing done so far, we expect the cache to be empty
require.Nil(t, c.slice())

c.Put("one", "111", time.Second)
c.Put("two", "222", time.Second)
c.Put("three", "333", time.Second)
require.Equal(t, []string{"111", "222", "333"}, c.slice())

now = now.Add(500 * time.Millisecond)
require.Equal(t, "111", c.Get("one"))
require.Equal(t, []string{"111", "222", "333"}, c.slice())

require.Equal(t, "222", c.Get("two"))
require.Equal(t, []string{"111", "222", "333"}, c.slice())

require.Equal(t, "333", c.Get("three"))
require.Equal(t, []string{"111", "222", "333"}, c.slice())

now = now.Add(500 * time.Millisecond)

require.Empty(t, c.Get("one"))
require.Empty(t, c.Get("two"))
require.Empty(t, c.Get("three"))
}
5 changes: 3 additions & 2 deletions resourcettl/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// NewCache creates a new resource cache.
//
// - ttl - is the time after which the resource is considered expired and cleaned up.
// - opts - options for the cache.
//
// A resource's ttl is extended every time it is checked out.
//
Expand All @@ -27,14 +28,14 @@ import (
// - Close() error
// - Stop()
// - Stop() error
func NewCache[K comparable, R any](ttl time.Duration) *Cache[K, R] {
func NewCache[K comparable, R any](ttl time.Duration, opts ...cachettl.Opt) *Cache[K, R] {
c := &Cache[K, R]{
keyMu: kitsync.NewPartitionLocker(),
resources: make(map[string]R),
checkouts: make(map[string]int),
expiries: make(map[string]struct{}),
ttl: ttl,
ttlcache: cachettl.New[K, string](),
ttlcache: cachettl.New[K, string](opts...),
}
c.ttlcache.OnEvicted(c.onEvicted)
return c
Expand Down
33 changes: 33 additions & 0 deletions resourcettl/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/cachettl"
"github.com/rudderlabs/rudder-go-kit/resourcettl"
)

Expand Down Expand Up @@ -138,6 +139,38 @@ func TestCache(t *testing.T) {
require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource")
checkin3()
})

t.Run("no ttl refresh", func(t *testing.T) {
now := time.Now()
ttl = 1 * time.Second
producer := &MockProducer{}
c := resourcettl.NewCache[string, *cleanuper](ttl, cachettl.WithNoRefreshTTL, cachettl.WithNow(func() time.Time { return now }))

r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err1, "it should be able to create a new resource")
require.NotNil(t, r1, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource")

now = now.Add(ttl / 2) // wait for some time less than ttl
checkin1()

r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err2, "it should be able to checkout the same resource")
require.NotNil(t, r2, "it should return a resource")
require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource")
require.Equal(t, r1.id, r2.id, "it should return the same resource")

now = now.Add(ttl / 2) // wait for some time less than ttl
checkin2()

r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper)
require.NoError(t, err3, "it should be able to create a new resource")
require.NotNil(t, r3, "it should return a resource")
require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired")
require.NotEqual(t, r1.id, r3.id, "it should return a different resource")
time.Sleep(time.Millisecond)
checkin3()
})
}

type MockProducer struct {
Expand Down

0 comments on commit 1961d98

Please sign in to comment.