From 830a7f6422fa4eba77421ef719b423df8e4d12af Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 11 Dec 2024 15:50:28 +0100 Subject: [PATCH 01/12] fix: sessiontree bug (cherry picked from commit 7d48aef23354497be55958ad2f484e1734550249) --- pkg/relayer/session/sessiontree.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index bf83cf0ad..6259b2bcf 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -2,6 +2,7 @@ package session import ( "bytes" + "context" "crypto/sha256" "fmt" "os" @@ -95,7 +96,13 @@ func NewSessionTree( // contain a non-hashed Relay that could be used to validate the proof on-chain. trie := smt.NewSparseMerkleSumTrie(treeStore, protocol.NewTrieHasher(), smt.WithValueHasher(nil)) + logger := polylog.Ctx(context.TODO()).With( + "session_id", sessionHeader.SessionId, + "supplier_operator_address", supplierOperatorAddress, + ) + sessionTree := &sessionTree{ + logger: logger, sessionHeader: sessionHeader, storePath: storePath, treeStore: treeStore, @@ -268,7 +275,6 @@ func (st *sessionTree) Delete() error { } else { st.logger.With( "claim_root", fmt.Sprintf("%x", st.GetClaimRoot()), - "session_id", st.GetSessionHeader().SessionId, ).Info().Msg("KVStore is already stopped") } From b0f08c3b5b6b01b6901925b8ea74468193b6a755 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 10 Dec 2024 15:49:13 +0100 Subject: [PATCH 02/12] chore: add QueryCache interface --- pkg/client/interface.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 365c24b74..be0f88296 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -360,3 +360,11 @@ type BankQueryClient interface { // GetBalance queries the chain for the uPOKT balance of the account provided GetBalance(ctx context.Context, address string) (*cosmostypes.Coin, error) } + +// QueryCache handles a single type of cached data +type QueryCache[T any] interface { + Get(key string) (T, error) + Set(key string, value T) error + Delete(key string) + Clear() +} From 87432467b18a43f23f14837b2790be97817a3585 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 10 Dec 2024 15:49:41 +0100 Subject: [PATCH 03/12] chore: add HistoricalQueryCache interface --- pkg/client/interface.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index be0f88296..fdbc3ef31 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -368,3 +368,12 @@ type QueryCache[T any] interface { Delete(key string) Clear() } + +// HistoricalQueryCache extends QueryCache to support historical values at different heights +type HistoricalQueryCache[T any] interface { + QueryCache[T] + // GetAtHeight retrieves the nearest value <= the specified height + GetAtHeight(key string, height int64) (T, error) + // SetAtHeight adds or updates a value at a specific height + SetAtHeight(key string, value T, height int64) error +} From d8954414467fddaa0dbffa9efc9ea39d914b8f56 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 10 Dec 2024 15:53:19 +0100 Subject: [PATCH 04/12] feat: add InMemoryCache implementation --- pkg/client/query/cache/config.go | 76 ++++++ pkg/client/query/cache/errors.go | 10 + pkg/client/query/cache/memory.go | 245 ++++++++++++++++++ pkg/client/query/cache/memory_test.go | 343 ++++++++++++++++++++++++++ 4 files changed, 674 insertions(+) create mode 100644 pkg/client/query/cache/config.go create mode 100644 pkg/client/query/cache/errors.go create mode 100644 pkg/client/query/cache/memory.go create mode 100644 pkg/client/query/cache/memory_test.go diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go new file mode 100644 index 000000000..10153f139 --- /dev/null +++ b/pkg/client/query/cache/config.go @@ -0,0 +1,76 @@ +package cache + +import ( + "time" +) + +// EvictionPolicy determines how items are removed when cache is full. +type EvictionPolicy int64 + +const ( + FirstInFirstOut = EvictionPolicy(iota) + LeastRecentlyUsed + LeastFrequentlyUsed +) + +// CacheConfig is the configuration options for a cache. +type CacheConfig struct { + // MaxKeys is the maximum number of items the cache can hold. + MaxKeys int64 + // EvictionPolicy is how items should be removed when the cache is full. + EvictionPolicy EvictionPolicy + // TTL is how long items should remain in the cache + TTL time.Duration + + // historical is whether the cache will cache a single value for each key + // (false) or whether it will cache a history of values for each key (true). + historical bool + // pruneOlderThan is the number of past blocks for which to keep historical + // values. If 0, no historical pruning is performed. + pruneOlderThan int64 +} + +// CacheOption defines a function that configures a CacheConfig +type CacheOption func(*CacheConfig) + +// HistoricalCacheConfig extends the basic CacheConfig with historical settings. +type HistoricalCacheConfig struct { + CacheConfig + + // MaxHeightsPerKey is the maximum number of different heights to store per key + MaxHeightsPerKey int + // PruneOlderThan specifies how many blocks back to maintain in history + // If 0, no historical pruning is performed + PruneOlderThan int64 +} + +// WithHistoricalMode enables historical caching with the given pruneOlderThan +// configuration, if 0 no historical pruning is performed. +func WithHistoricalMode(pruneOlderThan int64) CacheOption { + return func(cfg *CacheConfig) { + cfg.historical = true + cfg.pruneOlderThan = pruneOlderThan + } +} + +// WithMaxKeys sets the maximum number of distinct key/value pairs the cache will +// hold before evicting according to the configured eviction policy. +func WithMaxKeys(size int64) CacheOption { + return func(cfg *CacheConfig) { + cfg.MaxKeys = size + } +} + +// WithEvictionPolicy sets the eviction policy +func WithEvictionPolicy(policy EvictionPolicy) CacheOption { + return func(cfg *CacheConfig) { + cfg.EvictionPolicy = policy + } +} + +// WithTTL sets the time-to-live for cache entries +func WithTTL(ttl time.Duration) CacheOption { + return func(cfg *CacheConfig) { + cfg.TTL = ttl + } +} diff --git a/pkg/client/query/cache/errors.go b/pkg/client/query/cache/errors.go new file mode 100644 index 000000000..10e017568 --- /dev/null +++ b/pkg/client/query/cache/errors.go @@ -0,0 +1,10 @@ +package cache + +import "cosmossdk.io/errors" + +const codesace = "client/query/cache" + +var ( + ErrCacheMiss = errors.Register(codesace, 1, "cache miss") + ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled") +) diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go new file mode 100644 index 000000000..06732f55a --- /dev/null +++ b/pkg/client/query/cache/memory.go @@ -0,0 +1,245 @@ +package cache + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/pokt-network/poktroll/pkg/client" +) + +var ( + _ client.QueryCache[any] = (*InMemoryCache[any])(nil) + _ client.HistoricalQueryCache[any] = (*InMemoryCache[any])(nil) +) + +// InMemoryCache provides a concurrency-safe in-memory cache implementation with +// optional historical value support. +type InMemoryCache[T any] struct { + config CacheConfig + latestHeight atomic.Int64 + + itemsMu sync.RWMutex + // items type depends on historical mode: + // | historical mode | type | + // | --------------- | --------------------------------------- | + // | false | map[string]cacheItem[T] | + // | true | map[string]map[int64]heightCacheItem[T] | + items map[string]any +} + +// cacheItem wraps cached values with metadata +type cacheItem[T any] struct { + value T + timestamp time.Time +} + +// heightCacheItem is used when the cache is in historical mode +type heightCacheItem[T any] struct { + value T + timestamp time.Time +} + +// NewInMemoryCache creates a new cache with the given configuration +func NewInMemoryCache[T any](opts ...CacheOption) *InMemoryCache[T] { + config := CacheConfig{ + EvictionPolicy: FirstInFirstOut, + } + + for _, opt := range opts { + opt(&config) + } + + return &InMemoryCache[T]{ + items: make(map[string]interface{}), + config: config, + } +} + +// Get retrieves an item from the cache +func (c *InMemoryCache[T]) Get(key string) (T, error) { + if c.config.historical { + return c.GetAtHeight(key, c.latestHeight.Load()) + } + + c.itemsMu.RLock() + defer c.itemsMu.RUnlock() + + var zero T + + item, exists := c.items[key] + if !exists { + return zero, ErrCacheMiss + } + + cItem := item.(cacheItem[T]) + if c.config.TTL > 0 && time.Since(cItem.timestamp) > c.config.TTL { + // TODO_QUESTION: should we prune here? + return zero, ErrCacheMiss + } + + return cItem.value, nil +} + +// GetAtHeight retrieves an item from the cache at or before the specified height +func (c *InMemoryCache[T]) GetAtHeight(key string, height int64) (T, error) { + var zero T + + if !c.config.historical { + return zero, ErrHistoricalModeNotEnabled + } + + c.itemsMu.RLock() + defer c.itemsMu.RUnlock() + + heightMap, exists := c.items[key] + if !exists { + return zero, ErrCacheMiss + } + + versions := heightMap.(map[int64]heightCacheItem[T]) + var nearestHeight int64 = -1 + for h := range versions { + if h <= height && h > nearestHeight { + nearestHeight = h + } + } + + if nearestHeight == -1 { + return zero, ErrCacheMiss + } + + item := versions[nearestHeight] + if c.config.TTL > 0 && time.Since(item.timestamp) > c.config.TTL { + return zero, ErrCacheMiss + } + + return item.value, nil +} + +// Set adds or updates an item in the cache +func (c *InMemoryCache[T]) Set(key string, value T) error { + if c.config.historical { + return c.SetAtHeight(key, value, c.latestHeight.Load()) + } + + if c.config.MaxKeys > 0 && int64(len(c.items)) >= c.config.MaxKeys { + c.evict() + } + + c.itemsMu.Lock() + defer c.itemsMu.Unlock() + + c.items[key] = cacheItem[T]{ + value: value, + timestamp: time.Now(), + } + + return nil +} + +// SetAtHeight adds or updates an item in the cache at a specific height +func (c *InMemoryCache[T]) SetAtHeight(key string, value T, height int64) error { + if !c.config.historical { + return ErrHistoricalModeNotEnabled + } + + // Update latest height if this is newer + latestHeight := c.latestHeight.Load() + if height > latestHeight { + // NB: Only update if c.latestHeight hasn't changed since we loaded it above. + c.latestHeight.CompareAndSwap(latestHeight, height) + } + + c.itemsMu.Lock() + defer c.itemsMu.Unlock() + + var history map[int64]heightCacheItem[T] + if existing, exists := c.items[key]; exists { + history = existing.(map[int64]heightCacheItem[T]) + } else { + history = make(map[int64]heightCacheItem[T]) + c.items[key] = history + } + + // Prune old heights if configured + if c.config.pruneOlderThan > 0 { + for h := range history { + if height-h > c.config.pruneOlderThan { + delete(history, h) + } + } + } + + history[height] = heightCacheItem[T]{ + value: value, + timestamp: time.Now(), + } + + return nil +} + +// Delete removes an item from the cache. +func (c *InMemoryCache[T]) Delete(key string) { + c.itemsMu.Lock() + defer c.itemsMu.Unlock() + + delete(c.items, key) +} + +// Clear removes all items from the cache +func (c *InMemoryCache[T]) Clear() { + c.itemsMu.Lock() + defer c.itemsMu.Unlock() + + c.items = make(map[string]interface{}) + c.latestHeight.Store(0) +} + +// evict removes one item according to the configured eviction policy +func (c *InMemoryCache[T]) evict() { + switch c.config.EvictionPolicy { + case FirstInFirstOut: + var oldestKey string + var oldestTime time.Time + first := true + + for key, item := range c.items { + var itemTime time.Time + if c.config.historical { + versions := item.(map[int64]heightCacheItem[T]) + for _, v := range versions { + if itemTime.IsZero() || v.timestamp.Before(itemTime) { + itemTime = v.timestamp + } + } + } else { + itemTime = item.(cacheItem[T]).timestamp + } + + if first || itemTime.Before(oldestTime) { + oldestKey = key + oldestTime = itemTime + first = false + } + } + delete(c.items, oldestKey) + + case LeastRecentlyUsed: + // TODO: Implement LRU eviction + // This will require tracking access times + panic("LRU eviction not implemented") + + case LeastFrequentlyUsed: + // TODO: Implement LFU eviction + // This will require tracking access times + panic("LFU eviction not implemented") + + default: + // Default to FIFO if policy not recognized + for key := range c.items { + delete(c.items, key) + return + } + } +} diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go new file mode 100644 index 000000000..ae8b6a92f --- /dev/null +++ b/pkg/client/query/cache/memory_test.go @@ -0,0 +1,343 @@ +package cache + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestInMemoryCache_NonHistorical tests the basic cache functionality without historical mode +func TestInMemoryCache_NonHistorical(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { + cache := NewInMemoryCache[string]() + + // Test Set and Get + err := cache.Set("key1", "value1") + require.NoError(t, err) + val, err := cache.Get("key1") + require.NoError(t, err) + require.Equal(t, "value1", val) + + // Test missing key + _, err = cache.Get("nonexistent") + require.ErrorIs(t, err, ErrCacheMiss) + + // Test Delete + cache.Delete("key1") + _, err = cache.Get("key1") + require.ErrorIs(t, err, ErrCacheMiss) + + // Test Clear + err = cache.Set("key2", "value2") + require.NoError(t, err) + cache.Clear() + _, err = cache.Get("key2") + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("TTL expiration", func(t *testing.T) { + cache := NewInMemoryCache[string]( + WithTTL(100 * time.Millisecond), + ) + + err := cache.Set("key", "value") + require.NoError(t, err) + + // Value should be available immediately + val, err := cache.Get("key") + require.NoError(t, err) + require.Equal(t, "value", val) + + // Wait for TTL to expire + time.Sleep(150 * time.Millisecond) + + // Value should now be expired + _, err = cache.Get("key") + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("max size eviction", func(t *testing.T) { + cache := NewInMemoryCache[string]( + WithMaxKeys(2), + WithEvictionPolicy(FirstInFirstOut), + ) + + // Add items up to max size + err := cache.Set("key1", "value1") + require.NoError(t, err) + err = cache.Set("key2", "value2") + require.NoError(t, err) + + // Add one more item, should trigger eviction + err = cache.Set("key3", "value3") + require.NoError(t, err) + + // First item should be evicted + _, err = cache.Get("key1") + require.ErrorIs(t, err, ErrCacheMiss) + + // Other items should still be present + val, err := cache.Get("key2") + require.NoError(t, err) + require.Equal(t, "value2", val) + + val, err = cache.Get("key3") + require.NoError(t, err) + require.Equal(t, "value3", val) + }) +} + +// TestInMemoryCache_Historical tests the historical mode functionality +func TestInMemoryCache_Historical(t *testing.T) { + t.Run("basic historical operations", func(t *testing.T) { + cache := NewInMemoryCache[string]( + WithHistoricalMode(100), + ) + + // Test SetAtHeight and GetAtHeight + err := cache.SetAtHeight("key", "value1", 10) + require.NoError(t, err) + err = cache.SetAtHeight("key", "value2", 20) + require.NoError(t, err) + + // Test getting exact heights + val, err := cache.GetAtHeight("key", 10) + require.NoError(t, err) + require.Equal(t, "value1", val) + + val, err = cache.GetAtHeight("key", 20) + require.NoError(t, err) + require.Equal(t, "value2", val) + + // Test getting intermediate height (should return nearest lower height) + val, err = cache.GetAtHeight("key", 15) + require.NoError(t, err) + require.Equal(t, "value1", val) + + // Test getting height before first entry + _, err = cache.GetAtHeight("key", 5) + require.ErrorIs(t, err, ErrCacheMiss) + + // Test getting height after last entry + val, err = cache.GetAtHeight("key", 25) + require.NoError(t, err) + require.Equal(t, "value2", val) + }) + + t.Run("historical TTL expiration", func(t *testing.T) { + cache := NewInMemoryCache[string]( + WithHistoricalMode(100), + WithTTL(100*time.Millisecond), + ) + + err := cache.SetAtHeight("key", "value1", 10) + require.NoError(t, err) + + // Value should be available immediately + val, err := cache.GetAtHeight("key", 10) + require.NoError(t, err) + require.Equal(t, "value1", val) + + // Wait for TTL to expire + time.Sleep(150 * time.Millisecond) + + // Value should now be expired + _, err = cache.GetAtHeight("key", 10) + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("pruning old heights", func(t *testing.T) { + cache := NewInMemoryCache[string]( + WithHistoricalMode(10), // Prune entries older than 10 blocks + ) + + // Add entries at different heights + err := cache.SetAtHeight("key", "value1", 10) + require.NoError(t, err) + err = cache.SetAtHeight("key", "value2", 20) + require.NoError(t, err) + err = cache.SetAtHeight("key", "value3", 30) + require.NoError(t, err) + + // Add a new entry that should trigger pruning + err = cache.SetAtHeight("key", "value4", 40) + require.NoError(t, err) + + // Entries more than 10 blocks old should be pruned + _, err = cache.GetAtHeight("key", 10) + require.ErrorIs(t, err, ErrCacheMiss) + _, err = cache.GetAtHeight("key", 20) + require.ErrorIs(t, err, ErrCacheMiss) + + // Recent entries should still be available + val, err := cache.GetAtHeight("key", 30) + require.NoError(t, err) + require.Equal(t, "value3", val) + + val, err = cache.GetAtHeight("key", 40) + require.NoError(t, err) + require.Equal(t, "value4", val) + }) + + t.Run("non-historical operations on historical cache", func(t *testing.T) { + cache := NewInMemoryCache[string]( + WithHistoricalMode(100), + ) + + // Set some historical values + err := cache.SetAtHeight("key", "value1", 10) + require.NoError(t, err) + err = cache.SetAtHeight("key", "value2", 20) + require.NoError(t, err) + + // Regular Set should work with latest height + err = cache.Set("key", "value3") + require.NoError(t, err) + + // Regular Get should return the latest value + val, err := cache.Get("key") + require.NoError(t, err) + require.Equal(t, "value3", val) + + // Delete should remove all historical values + cache.Delete("key") + _, err = cache.GetAtHeight("key", 10) + require.ErrorIs(t, err, ErrCacheMiss) + _, err = cache.GetAtHeight("key", 20) + require.ErrorIs(t, err, ErrCacheMiss) + _, err = cache.Get("key") + require.ErrorIs(t, err, ErrCacheMiss) + }) +} + +// TestInMemoryCache_ErrorCases tests various error conditions +func TestInMemoryCache_ErrorCases(t *testing.T) { + t.Run("historical operations on non-historical cache", func(t *testing.T) { + cache := NewInMemoryCache[string]() + + // Attempting historical operations should return error + err := cache.SetAtHeight("key", "value", 10) + require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) + + _, err = cache.GetAtHeight("key", 10) + require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) + }) + + t.Run("zero values", func(t *testing.T) { + cache := NewInMemoryCache[string]() + + // Test with empty key + err := cache.Set("", "value") + require.NoError(t, err) + val, err := cache.Get("") + require.NoError(t, err) + require.Equal(t, "value", val) + + // Test with empty value + err = cache.Set("key", "") + require.NoError(t, err) + val, err = cache.Get("key") + require.NoError(t, err) + require.Equal(t, "", val) + }) +} + +// TestInMemoryCache_ConcurrentAccess tests thread safety of the cache +func TestInMemoryCache_ConcurrentAccess(t *testing.T) { + t.Run("concurrent access non-historical", func(t *testing.T) { + cache := NewInMemoryCache[int]() + const numGoroutines = 10 + const numOperations = 100 + + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(routineID int) { + defer wg.Done() + for j := 0; j < numOperations; j++ { + // Check for timeout + select { + case <-ctx.Done(): + t.Errorf("test timed out: %v", ctx.Err()) + return + default: + key := "key" + err := cache.Set(key, j) + require.NoError(t, err) + _, _ = cache.Get(key) + } + } + }(i) + } + + // Wait with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + t.Errorf("test timed out waiting for goroutines to complete: %v", ctx.Err()) + case <-done: + // Test completed successfully + } + }) + + t.Run("concurrent access historical", func(t *testing.T) { + cache := NewInMemoryCache[int]( + WithHistoricalMode(100), + ) + const numGoroutines = 10 + const numOpsPerGoRoutine = 100 + + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(routineID int) { + defer wg.Done() + for j := 0; j < numOpsPerGoRoutine; j++ { + // Check for timeout + select { + case <-ctx.Done(): + t.Errorf("test timed out: %v", ctx.Err()) + return + default: + key := "key" + err := cache.SetAtHeight(key, j, int64(j)) + require.NoError(t, err) + _, _ = cache.GetAtHeight(key, int64(j)) + } + } + }(i) + } + + // Wait with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + t.Errorf("test timed out waiting for goroutines to complete: %v", ctx.Err()) + case <-done: + // Test completed successfully + } + }) +} From 23cc94a8f7861c70933e78972ca8acb9831484b1 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 12 Dec 2024 10:52:07 +0100 Subject: [PATCH 05/12] chore: self-review improvements --- pkg/client/interface.go | 7 +- pkg/client/query/cache/config.go | 51 ++++----- pkg/client/query/cache/memory.go | 148 +++++++++++++++++--------- pkg/client/query/cache/memory_test.go | 24 ++--- 4 files changed, 140 insertions(+), 90 deletions(-) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index fdbc3ef31..cd637be2e 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -361,7 +361,9 @@ type BankQueryClient interface { GetBalance(ctx context.Context, address string) (*cosmostypes.Coin, error) } -// QueryCache handles a single type of cached data +// QueryCache is a key/value store style interface for a cache of a single type. +// It is intended to be used to cache query responses (or derivatives thereof), +// where each key uniquely indexes its most recent value. type QueryCache[T any] interface { Get(key string) (T, error) Set(key string, value T) error @@ -369,7 +371,8 @@ type QueryCache[T any] interface { Clear() } -// HistoricalQueryCache extends QueryCache to support historical values at different heights +// HistoricalQueryCache extends QueryCache to support getting and setting values +// at multiple heights for a given key. type HistoricalQueryCache[T any] interface { QueryCache[T] // GetAtHeight retrieves the nearest value <= the specified height diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index 10153f139..cc71900dc 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -4,7 +4,7 @@ import ( "time" ) -// EvictionPolicy determines how items are removed when cache is full. +// EvictionPolicy determines how items are removed when number of keys in the cache reaches MaxKeys. type EvictionPolicy int64 const ( @@ -13,29 +13,32 @@ const ( LeastFrequentlyUsed ) -// CacheConfig is the configuration options for a cache. -type CacheConfig struct { - // MaxKeys is the maximum number of items the cache can hold. - MaxKeys int64 - // EvictionPolicy is how items should be removed when the cache is full. +// queryCacheConfig is the configuration for query caches. It is intended to be +// configured via QueryCacheOptionFn functions. +type queryCacheConfig struct { + // MaxKeys is the maximum number of items (key/value pairs) the cache can + // hold before it starts evicting. + MaxKeys int64 EvictionPolicy EvictionPolicy - // TTL is how long items should remain in the cache + // TTL is how long items should remain in the cache. Items older than the TTL + // MAY not be evicted but SHOULD not be considered as cache hits. TTL time.Duration - // historical is whether the cache will cache a single value for each key - // (false) or whether it will cache a history of values for each key (true). + // historical determines whether the cache will cache a single value for each + // key (false), or whether it will cache a history of values for each key (true). historical bool // pruneOlderThan is the number of past blocks for which to keep historical - // values. If 0, no historical pruning is performed. + // values. If 0, no historical pruning is performed. It only applies when + // historical is true. pruneOlderThan int64 } -// CacheOption defines a function that configures a CacheConfig -type CacheOption func(*CacheConfig) +// QueryCacheOptionFn defines a function that configures a queryCacheConfig +type QueryCacheOptionFn func(*queryCacheConfig) -// HistoricalCacheConfig extends the basic CacheConfig with historical settings. -type HistoricalCacheConfig struct { - CacheConfig +// HistoricalQueryCacheConfig extends the basic queryCacheConfig with historical settings. +type HistoricalQueryCacheConfig struct { + queryCacheConfig // MaxHeightsPerKey is the maximum number of different heights to store per key MaxHeightsPerKey int @@ -46,8 +49,8 @@ type HistoricalCacheConfig struct { // WithHistoricalMode enables historical caching with the given pruneOlderThan // configuration, if 0 no historical pruning is performed. -func WithHistoricalMode(pruneOlderThan int64) CacheOption { - return func(cfg *CacheConfig) { +func WithHistoricalMode(pruneOlderThan int64) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { cfg.historical = true cfg.pruneOlderThan = pruneOlderThan } @@ -55,22 +58,22 @@ func WithHistoricalMode(pruneOlderThan int64) CacheOption { // WithMaxKeys sets the maximum number of distinct key/value pairs the cache will // hold before evicting according to the configured eviction policy. -func WithMaxKeys(size int64) CacheOption { - return func(cfg *CacheConfig) { - cfg.MaxKeys = size +func WithMaxKeys(maxKeys int64) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { + cfg.MaxKeys = maxKeys } } // WithEvictionPolicy sets the eviction policy -func WithEvictionPolicy(policy EvictionPolicy) CacheOption { - return func(cfg *CacheConfig) { +func WithEvictionPolicy(policy EvictionPolicy) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { cfg.EvictionPolicy = policy } } // WithTTL sets the time-to-live for cache entries -func WithTTL(ttl time.Duration) CacheOption { - return func(cfg *CacheConfig) { +func WithTTL(ttl time.Duration) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { cfg.TTL = ttl } } diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index 06732f55a..edbfe1824 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -1,6 +1,7 @@ package cache import ( + "sort" "sync" "sync/atomic" "time" @@ -16,15 +17,15 @@ var ( // InMemoryCache provides a concurrency-safe in-memory cache implementation with // optional historical value support. type InMemoryCache[T any] struct { - config CacheConfig + config queryCacheConfig latestHeight atomic.Int64 itemsMu sync.RWMutex // items type depends on historical mode: - // | historical mode | type | - // | --------------- | --------------------------------------- | - // | false | map[string]cacheItem[T] | - // | true | map[string]map[int64]heightCacheItem[T] | + // | historical mode | type | + // | --------------- | ------------------------------ | + // | false | map[string]cacheItem[T] | + // | true | map[string]cacheItemHistory[T] | items map[string]any } @@ -34,15 +35,16 @@ type cacheItem[T any] struct { timestamp time.Time } -// heightCacheItem is used when the cache is in historical mode -type heightCacheItem[T any] struct { - value T - timestamp time.Time +type cacheItemHistory[T any] struct { + // sortedDescHeights is a list of the heights for which values are cached. + // It is sorted in descending order. + sortedDescHeights []int64 + itemsByHeight map[int64]cacheItem[T] } // NewInMemoryCache creates a new cache with the given configuration -func NewInMemoryCache[T any](opts ...CacheOption) *InMemoryCache[T] { - config := CacheConfig{ +func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) *InMemoryCache[T] { + config := queryCacheConfig{ EvictionPolicy: FirstInFirstOut, } @@ -56,7 +58,10 @@ func NewInMemoryCache[T any](opts ...CacheOption) *InMemoryCache[T] { } } -// Get retrieves an item from the cache +// Get retrieves the value from the cache with the given key. If the cache is +// configured for historical mode, it will return the value at the latest **known** +// height, which is only updated on calls to SetAtHeight, and therefore is not +// guaranteed to be the current height. func (c *InMemoryCache[T]) Get(key string) (T, error) { if c.config.historical { return c.GetAtHeight(key, c.latestHeight.Load()) @@ -69,20 +74,25 @@ func (c *InMemoryCache[T]) Get(key string) (T, error) { item, exists := c.items[key] if !exists { - return zero, ErrCacheMiss + return zero, ErrCacheMiss.Wrapf("key: %s", key) } cItem := item.(cacheItem[T]) if c.config.TTL > 0 && time.Since(cItem.timestamp) > c.config.TTL { - // TODO_QUESTION: should we prune here? - return zero, ErrCacheMiss + // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; + // otherwise, the read lock would be insufficient. The value will be + // overwritten by the next call to Set(). + return zero, ErrCacheMiss.Wrapf("key: %s", key) } return cItem.value, nil } -// GetAtHeight retrieves an item from the cache at or before the specified height -func (c *InMemoryCache[T]) GetAtHeight(key string, height int64) (T, error) { +// GetAtHeight retrieves the value from the cache with the given key, at the given +// height. If a value is not found for that height, the value at the nearest previous +// height is returned. If the cache is not configured for historical mode, it returns +// an error. +func (c *InMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { var zero T if !c.config.historical { @@ -92,32 +102,44 @@ func (c *InMemoryCache[T]) GetAtHeight(key string, height int64) (T, error) { c.itemsMu.RLock() defer c.itemsMu.RUnlock() - heightMap, exists := c.items[key] + itemHistoryAny, exists := c.items[key] if !exists { - return zero, ErrCacheMiss + return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) } - versions := heightMap.(map[int64]heightCacheItem[T]) - var nearestHeight int64 = -1 - for h := range versions { - if h <= height && h > nearestHeight { - nearestHeight = h + itemHistory := itemHistoryAny.(cacheItemHistory[T]) + var nearestCachedHeight int64 = -1 + for _, cachedHeight := range itemHistory.sortedDescHeights { + if cachedHeight <= getHeight { + nearestCachedHeight = cachedHeight + // DEV_NOTE: Since the list is sorted in descending order, once we + // encounter a cachedHeight that is less than or equal to getHeight, + // all subsequent cachedHeights SHOULD also be less than or equal to + // getHeight. + break } } - if nearestHeight == -1 { - return zero, ErrCacheMiss + if nearestCachedHeight == -1 { + return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) } - item := versions[nearestHeight] + item := itemHistory.itemsByHeight[nearestCachedHeight] if c.config.TTL > 0 && time.Since(item.timestamp) > c.config.TTL { - return zero, ErrCacheMiss + // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; + // otherwise, the read lock would be insufficient. The value will be pruned + // in the subsequent call to SetAtHeight() after c.config.pruneOlderThan + // blocks have elapsed. + return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) } return item.value, nil } -// Set adds or updates an item in the cache +// Set adds or updates the value in the cache for the given key. If the cache is +// configured for historical mode, it will store the value at the latest **known** +// height, which is only updated on calls to SetAtHeight, and therefore is not +// guaranteed to be the current height. func (c *InMemoryCache[T]) Set(key string, value T) error { if c.config.historical { return c.SetAtHeight(key, value, c.latestHeight.Load()) @@ -138,40 +160,65 @@ func (c *InMemoryCache[T]) Set(key string, value T) error { return nil } -// SetAtHeight adds or updates an item in the cache at a specific height -func (c *InMemoryCache[T]) SetAtHeight(key string, value T, height int64) error { +// SetAtHeight adds or updates the historical value in the cache for the given key, +// and at the given height. If the cache is not configured for historical mode, it +// returns an error. +func (c *InMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) error { if !c.config.historical { return ErrHistoricalModeNotEnabled } - // Update latest height if this is newer + // Update c.latestHeight if the given setHeight is newer (higher). latestHeight := c.latestHeight.Load() - if height > latestHeight { + if setHeight > latestHeight { // NB: Only update if c.latestHeight hasn't changed since we loaded it above. - c.latestHeight.CompareAndSwap(latestHeight, height) + c.latestHeight.CompareAndSwap(latestHeight, setHeight) } c.itemsMu.Lock() defer c.itemsMu.Unlock() - var history map[int64]heightCacheItem[T] - if existing, exists := c.items[key]; exists { - history = existing.(map[int64]heightCacheItem[T]) + // TODO_IN_THIS_COMMIT: refactor history to be a struct which includes sortedDescHeights... + var itemHistory cacheItemHistory[T] + if itemHistoryAny, exists := c.items[key]; exists { + itemHistory = itemHistoryAny.(cacheItemHistory[T]) } else { - history = make(map[int64]heightCacheItem[T]) - c.items[key] = history + itemsByHeight := make(map[int64]cacheItem[T]) + itemHistory = cacheItemHistory[T]{ + sortedDescHeights: make([]int64, 0), + itemsByHeight: itemsByHeight, + } } - // Prune old heights if configured + // Update sortedDescHeights and ensure the list is sorted in descending order. + if _, setHeightExists := itemHistory.itemsByHeight[setHeight]; !setHeightExists { + itemHistory.sortedDescHeights = append(itemHistory.sortedDescHeights, setHeight) + sort.Slice(itemHistory.sortedDescHeights, func(i, j int) bool { + return itemHistory.sortedDescHeights[i] > itemHistory.sortedDescHeights[j] + }) + } + + c.items[key] = itemHistory + + // Prune historical values for this key, where the setHeight + // is oder than the configured pruneOlderThan. if c.config.pruneOlderThan > 0 { - for h := range history { - if height-h > c.config.pruneOlderThan { - delete(history, h) + for heightIdx := int64(len(itemHistory.sortedDescHeights)) - 1; heightIdx >= 0; heightIdx-- { + cachedHeight := itemHistory.sortedDescHeights[heightIdx] + + // DEV_NOTE: Since the list is sorted, and we're iterating from highest (youngest) + // to lowest (oldest) height, once we encounter a cachedHeight that is older than the + // configured pruneOlderThan, ALL subsequent heights SHOULD also be older than the + // configured pruneOlderThan. + if setHeight-cachedHeight < c.config.pruneOlderThan { + break } + + delete(itemHistory.itemsByHeight, setHeight) } } - history[height] = heightCacheItem[T]{ + itemHistory.itemsByHeight[setHeight] = cacheItem[T]{ value: value, timestamp: time.Now(), } @@ -187,7 +234,7 @@ func (c *InMemoryCache[T]) Delete(key string) { delete(c.items, key) } -// Clear removes all items from the cache +// Clear removes all items from the cache. func (c *InMemoryCache[T]) Clear() { c.itemsMu.Lock() defer c.itemsMu.Unlock() @@ -196,7 +243,8 @@ func (c *InMemoryCache[T]) Clear() { c.latestHeight.Store(0) } -// evict removes one item according to the configured eviction policy +// evict removes one item from the cache, to make space for a new one, +// according to the configured eviction policy func (c *InMemoryCache[T]) evict() { switch c.config.EvictionPolicy { case FirstInFirstOut: @@ -207,8 +255,8 @@ func (c *InMemoryCache[T]) evict() { for key, item := range c.items { var itemTime time.Time if c.config.historical { - versions := item.(map[int64]heightCacheItem[T]) - for _, v := range versions { + itemHistory := item.(cacheItemHistory[T]) + for _, v := range itemHistory.itemsByHeight { if itemTime.IsZero() || v.timestamp.Before(itemTime) { itemTime = v.timestamp } @@ -226,12 +274,12 @@ func (c *InMemoryCache[T]) evict() { delete(c.items, oldestKey) case LeastRecentlyUsed: - // TODO: Implement LRU eviction + // TODO_IMPROVE: Implement LRU eviction // This will require tracking access times panic("LRU eviction not implemented") case LeastFrequentlyUsed: - // TODO: Implement LFU eviction + // TODO_IMPROVE: Implement LFU eviction // This will require tracking access times panic("LFU eviction not implemented") diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go index ae8b6a92f..6bf6672cb 100644 --- a/pkg/client/query/cache/memory_test.go +++ b/pkg/client/query/cache/memory_test.go @@ -260,13 +260,11 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { - go func(routineID int) { + go func() { defer wg.Done() for j := 0; j < numOperations; j++ { - // Check for timeout select { case <-ctx.Done(): - t.Errorf("test timed out: %v", ctx.Err()) return default: key := "key" @@ -275,10 +273,10 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { _, _ = cache.Get(key) } } - }(i) + }() } - // Wait with timeout + // Wait for waitgroup with timeout. done := make(chan struct{}) go func() { wg.Wait() @@ -287,9 +285,9 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { select { case <-ctx.Done(): - t.Errorf("test timed out waiting for goroutines to complete: %v", ctx.Err()) + t.Errorf("test timed out waiting for workgroup to complete: %+v", ctx.Err()) case <-done: - // Test completed successfully + t.Log("test completed successfully") } }) @@ -308,13 +306,11 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { - go func(routineID int) { + go func() { defer wg.Done() for j := 0; j < numOpsPerGoRoutine; j++ { - // Check for timeout select { case <-ctx.Done(): - t.Errorf("test timed out: %v", ctx.Err()) return default: key := "key" @@ -323,10 +319,10 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { _, _ = cache.GetAtHeight(key, int64(j)) } } - }(i) + }() } - // Wait with timeout + // Wait for waitgroup with timeout. done := make(chan struct{}) go func() { wg.Wait() @@ -335,9 +331,9 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { select { case <-ctx.Done(): - t.Errorf("test timed out waiting for goroutines to complete: %v", ctx.Err()) + t.Errorf("test timed out waiting for goroutines to complete: %+v", ctx.Err()) case <-done: - // Test completed successfully + t.Log("test completed successfully") } }) } From 366ab1d22423f3da82eb8740ec803570c150ec54 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 12 Dec 2024 11:07:28 +0100 Subject: [PATCH 06/12] chore: self-review improvements --- pkg/client/query/cache/config.go | 20 +++++--------------- pkg/client/query/cache/memory.go | 21 +++++++++++++++------ 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index cc71900dc..ee5335263 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -33,22 +33,11 @@ type queryCacheConfig struct { pruneOlderThan int64 } -// QueryCacheOptionFn defines a function that configures a queryCacheConfig +// QueryCacheOptionFn is a function which receives a queryCacheConfig for configuration. type QueryCacheOptionFn func(*queryCacheConfig) -// HistoricalQueryCacheConfig extends the basic queryCacheConfig with historical settings. -type HistoricalQueryCacheConfig struct { - queryCacheConfig - - // MaxHeightsPerKey is the maximum number of different heights to store per key - MaxHeightsPerKey int - // PruneOlderThan specifies how many blocks back to maintain in history - // If 0, no historical pruning is performed - PruneOlderThan int64 -} - // WithHistoricalMode enables historical caching with the given pruneOlderThan -// configuration, if 0 no historical pruning is performed. +// configuration; if 0, no historical pruning is performed. func WithHistoricalMode(pruneOlderThan int64) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { cfg.historical = true @@ -64,14 +53,15 @@ func WithMaxKeys(maxKeys int64) QueryCacheOptionFn { } } -// WithEvictionPolicy sets the eviction policy +// WithEvictionPolicy sets the eviction policy. func WithEvictionPolicy(policy EvictionPolicy) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { cfg.EvictionPolicy = policy } } -// WithTTL sets the time-to-live for cache entries +// WithTTL sets the time-to-live for cached items. Items older than the TTL +// MAY not be evicted but SHOULD not be considered as cache hits. func WithTTL(ttl time.Duration) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { cfg.TTL = ttl diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index edbfe1824..f7ac5f84e 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -12,6 +12,13 @@ import ( var ( _ client.QueryCache[any] = (*InMemoryCache[any])(nil) _ client.HistoricalQueryCache[any] = (*InMemoryCache[any])(nil) + + DefaultQueryCacheConfig = queryCacheConfig{ + EvictionPolicy: FirstInFirstOut, + // TODO_MAINNET(@bryanchriswhite): Consider how we can "guarantee" good + // alignment between the TTL and the block production rate. + TTL: time.Minute, + } ) // InMemoryCache provides a concurrency-safe in-memory cache implementation with @@ -29,12 +36,16 @@ type InMemoryCache[T any] struct { items map[string]any } -// cacheItem wraps cached values with metadata +// cacheItem wraps cached values with a timestamp for later comparison against +// the configured TTL. type cacheItem[T any] struct { value T timestamp time.Time } +// cacheItemHistory stores cachedItems by height and maintains a sorted list of +// heights for which cached items exist. This list is sorted in descending order +// to improve performance characteristics by positively correlating index with age. type cacheItemHistory[T any] struct { // sortedDescHeights is a list of the heights for which values are cached. // It is sorted in descending order. @@ -42,11 +53,10 @@ type cacheItemHistory[T any] struct { itemsByHeight map[int64]cacheItem[T] } -// NewInMemoryCache creates a new cache with the given configuration +// NewInMemoryCache creates a new InMemoryCache with the configuration generated +// by the given option functions. func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) *InMemoryCache[T] { - config := queryCacheConfig{ - EvictionPolicy: FirstInFirstOut, - } + config := DefaultQueryCacheConfig for _, opt := range opts { opt(&config) @@ -178,7 +188,6 @@ func (c *InMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) err c.itemsMu.Lock() defer c.itemsMu.Unlock() - // TODO_IN_THIS_COMMIT: refactor history to be a struct which includes sortedDescHeights... var itemHistory cacheItemHistory[T] if itemHistoryAny, exists := c.items[key]; exists { itemHistory = itemHistoryAny.(cacheItemHistory[T]) From 4632c747560b8b846008a2f7bca551bed4817aa2 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 12 Dec 2024 12:19:59 +0100 Subject: [PATCH 07/12] fix: historical pruning --- pkg/client/query/cache/memory.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index f7ac5f84e..bad26f09a 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -207,23 +207,23 @@ func (c *InMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) err }) } - c.items[key] = itemHistory - // Prune historical values for this key, where the setHeight // is oder than the configured pruneOlderThan. if c.config.pruneOlderThan > 0 { - for heightIdx := int64(len(itemHistory.sortedDescHeights)) - 1; heightIdx >= 0; heightIdx-- { + lenCachedHeights := int64(len(itemHistory.sortedDescHeights)) + for heightIdx := lenCachedHeights - 1; heightIdx >= 0; heightIdx-- { cachedHeight := itemHistory.sortedDescHeights[heightIdx] - // DEV_NOTE: Since the list is sorted, and we're iterating from highest (youngest) - // to lowest (oldest) height, once we encounter a cachedHeight that is older than the - // configured pruneOlderThan, ALL subsequent heights SHOULD also be older than the - // configured pruneOlderThan. - if setHeight-cachedHeight < c.config.pruneOlderThan { + // DEV_NOTE: Since the list is sorted, and we're iterating from lowest + // (oldest) to highest (youngest) height, once we encounter a cachedHeight + // that is younger than the configured pruneOlderThan, ALL subsequent + // heights SHOULD also be younger than the configured pruneOlderThan. + if setHeight-cachedHeight <= c.config.pruneOlderThan { + itemHistory.sortedDescHeights = itemHistory.sortedDescHeights[:heightIdx+1] break } - delete(itemHistory.itemsByHeight, setHeight) + delete(itemHistory.itemsByHeight, cachedHeight) } } @@ -232,6 +232,8 @@ func (c *InMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) err timestamp: time.Now(), } + c.items[key] = itemHistory + return nil } From e48a2f260a10dc9763c923f81d0f60d7cd8ac607 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 16 Dec 2024 10:20:38 +0100 Subject: [PATCH 08/12] chore: review feedback improvements Co-authored-by: Daniel Olshansky --- pkg/client/interface.go | 2 +- pkg/client/query/cache/config.go | 8 ++++---- pkg/client/query/cache/memory.go | 9 ++++++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index cd637be2e..1f56a8870 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -363,7 +363,7 @@ type BankQueryClient interface { // QueryCache is a key/value store style interface for a cache of a single type. // It is intended to be used to cache query responses (or derivatives thereof), -// where each key uniquely indexes its most recent value. +// where each key uniquely indexes the most recent query response. type QueryCache[T any] interface { Get(key string) (T, error) Set(key string, value T) error diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index ee5335263..b5da6752d 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -13,8 +13,8 @@ const ( LeastFrequentlyUsed ) -// queryCacheConfig is the configuration for query caches. It is intended to be -// configured via QueryCacheOptionFn functions. +// queryCacheConfig is the configuration for query caches. +// It is intended to be configured via QueryCacheOptionFn functions. type queryCacheConfig struct { // MaxKeys is the maximum number of items (key/value pairs) the cache can // hold before it starts evicting. @@ -24,8 +24,8 @@ type queryCacheConfig struct { // MAY not be evicted but SHOULD not be considered as cache hits. TTL time.Duration - // historical determines whether the cache will cache a single value for each - // key (false), or whether it will cache a history of values for each key (true). + // historical determines whether each key will point to a single values (false) + // or a history (i.e. reverse chronological list) of values (true). historical bool // pruneOlderThan is the number of past blocks for which to keep historical // values. If 0, no historical pruning is performed. It only applies when diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index bad26f09a..5ffe543d9 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -16,7 +16,8 @@ var ( DefaultQueryCacheConfig = queryCacheConfig{ EvictionPolicy: FirstInFirstOut, // TODO_MAINNET(@bryanchriswhite): Consider how we can "guarantee" good - // alignment between the TTL and the block production rate. + // alignment between the TTL and the block production rate, + // by accessing onchain block times directly. TTL: time.Minute, } ) @@ -71,7 +72,7 @@ func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) *InMemoryCache[T] { // Get retrieves the value from the cache with the given key. If the cache is // configured for historical mode, it will return the value at the latest **known** // height, which is only updated on calls to SetAtHeight, and therefore is not -// guaranteed to be the current height. +// guaranteed to be the current height w.r.t the blockchain. func (c *InMemoryCache[T]) Get(key string) (T, error) { if c.config.historical { return c.GetAtHeight(key, c.latestHeight.Load()) @@ -88,7 +89,9 @@ func (c *InMemoryCache[T]) Get(key string) (T, error) { } cItem := item.(cacheItem[T]) - if c.config.TTL > 0 && time.Since(cItem.timestamp) > c.config.TTL { + isTTLEnabled := c.config.TTL > 0 + isCacheItemExpired := time.Since(cItem.timestamp) > c.config.TTL + if isTTLEnabled && isCacheItemExpired { // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; // otherwise, the read lock would be insufficient. The value will be // overwritten by the next call to Set(). From d5ce62f680d0e35d8f2cbc397b32621224ecac37 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 16 Dec 2024 12:22:51 +0100 Subject: [PATCH 09/12] chore: review feedback improvements --- pkg/client/query/cache/config.go | 52 +++-- pkg/client/query/cache/errors.go | 6 +- pkg/client/query/cache/memory.go | 289 +++++++++++++++----------- pkg/client/query/cache/memory_test.go | 57 +++-- 4 files changed, 247 insertions(+), 157 deletions(-) diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index b5da6752d..49d80435e 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -4,7 +4,7 @@ import ( "time" ) -// EvictionPolicy determines how items are removed when number of keys in the cache reaches MaxKeys. +// EvictionPolicy determines which items are removed when number of keys in the cache reaches maxKeys. type EvictionPolicy int64 const ( @@ -16,32 +16,52 @@ const ( // queryCacheConfig is the configuration for query caches. // It is intended to be configured via QueryCacheOptionFn functions. type queryCacheConfig struct { - // MaxKeys is the maximum number of items (key/value pairs) the cache can + // maxKeys is the maximum number of items (key/value pairs) the cache can // hold before it starts evicting. - MaxKeys int64 - EvictionPolicy EvictionPolicy - // TTL is how long items should remain in the cache. Items older than the TTL - // MAY not be evicted but SHOULD not be considered as cache hits. - TTL time.Duration + maxKeys int64 + // TODO_CONSIDERATION: + // + // maxValueSize is the maximum cumulative size of all values in the cache. + // maxValueSize int64 + // maxCacheSize is the maximum cumulative size of all keys AND values in the cache. + // maxCacheSize int64 + + // evictionPolicy determines which items are removed when number of keys in the cache reaches maxKeys. + evictionPolicy EvictionPolicy + // ttl is how long items should remain in the cache. Items older than the ttl + // MAY NOT be evicted immediately, but are NEVER considered as cache hits. + ttl time.Duration // historical determines whether each key will point to a single values (false) // or a history (i.e. reverse chronological list) of values (true). historical bool - // pruneOlderThan is the number of past blocks for which to keep historical + // numHistoricalValues is the number of past blocks for which to keep historical // values. If 0, no historical pruning is performed. It only applies when // historical is true. - pruneOlderThan int64 + numHistoricalValues int64 } // QueryCacheOptionFn is a function which receives a queryCacheConfig for configuration. type QueryCacheOptionFn func(*queryCacheConfig) -// WithHistoricalMode enables historical caching with the given pruneOlderThan +// Validate ensures that the queryCacheConfig isn't configured with incompatible options. +func (cfg *queryCacheConfig) Validate() error { + switch cfg.evictionPolicy { + case FirstInFirstOut: + // TODO_IMPROVE: support LeastRecentlyUsed and LeastFrequentlyUsed policies. + default: + return ErrQueryCacheConfigValidation.Wrapf("eviction policy %d not imlemented", cfg.evictionPolicy) + } + + return nil +} + +// WithHistoricalMode enables historical caching with the given numHistoricalValues // configuration; if 0, no historical pruning is performed. -func WithHistoricalMode(pruneOlderThan int64) QueryCacheOptionFn { +func WithHistoricalMode(numHistoricalBlocks int64) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { cfg.historical = true - cfg.pruneOlderThan = pruneOlderThan + cfg.numHistoricalValues = numHistoricalBlocks } } @@ -49,21 +69,21 @@ func WithHistoricalMode(pruneOlderThan int64) QueryCacheOptionFn { // hold before evicting according to the configured eviction policy. func WithMaxKeys(maxKeys int64) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { - cfg.MaxKeys = maxKeys + cfg.maxKeys = maxKeys } } // WithEvictionPolicy sets the eviction policy. func WithEvictionPolicy(policy EvictionPolicy) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { - cfg.EvictionPolicy = policy + cfg.evictionPolicy = policy } } // WithTTL sets the time-to-live for cached items. Items older than the TTL -// MAY not be evicted but SHOULD not be considered as cache hits. +// MAY NOT be evicted immediately, but are NEVER considered as cache hits. func WithTTL(ttl time.Duration) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { - cfg.TTL = ttl + cfg.ttl = ttl } } diff --git a/pkg/client/query/cache/errors.go b/pkg/client/query/cache/errors.go index 10e017568..b80342999 100644 --- a/pkg/client/query/cache/errors.go +++ b/pkg/client/query/cache/errors.go @@ -5,6 +5,8 @@ import "cosmossdk.io/errors" const codesace = "client/query/cache" var ( - ErrCacheMiss = errors.Register(codesace, 1, "cache miss") - ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled") + ErrCacheMiss = errors.Register(codesace, 1, "cache miss") + ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled") + ErrQueryCacheConfigValidation = errors.Register(codesace, 3, "invalid query cache config") + ErrCacheInternal = errors.Register(codesace, 4, "cache internal error") ) diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index 5ffe543d9..414efb3a9 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -10,119 +10,119 @@ import ( ) var ( - _ client.QueryCache[any] = (*InMemoryCache[any])(nil) - _ client.HistoricalQueryCache[any] = (*InMemoryCache[any])(nil) + _ client.QueryCache[any] = (*inMemoryCache[any])(nil) + _ client.HistoricalQueryCache[any] = (*inMemoryCache[any])(nil) DefaultQueryCacheConfig = queryCacheConfig{ - EvictionPolicy: FirstInFirstOut, + evictionPolicy: FirstInFirstOut, // TODO_MAINNET(@bryanchriswhite): Consider how we can "guarantee" good // alignment between the TTL and the block production rate, // by accessing onchain block times directly. - TTL: time.Minute, + ttl: time.Minute, } ) -// InMemoryCache provides a concurrency-safe in-memory cache implementation with +// inMemoryCache provides a concurrency-safe in-memory cache implementation with // optional historical value support. -type InMemoryCache[T any] struct { +type inMemoryCache[T any] struct { config queryCacheConfig latestHeight atomic.Int64 - itemsMu sync.RWMutex - // items type depends on historical mode: - // | historical mode | type | - // | --------------- | ------------------------------ | - // | false | map[string]cacheItem[T] | - // | true | map[string]cacheItemHistory[T] | - items map[string]any + // valuesMu is used to protect values AND valueHistories from concurrent access. + valuesMu sync.RWMutex + // values holds the cached values in non-historical mode. + values map[string]cacheValue[T] + // valueHistories holds the cached historical values in historical mode. + valueHistories map[string]cacheValueHistory[T] } -// cacheItem wraps cached values with a timestamp for later comparison against +// cacheValue wraps cached values with a cachedAt for later comparison against // the configured TTL. -type cacheItem[T any] struct { - value T - timestamp time.Time +type cacheValue[T any] struct { + value T + cachedAt time.Time } -// cacheItemHistory stores cachedItems by height and maintains a sorted list of +// cacheValueHistory stores cachedItems by height and maintains a sorted list of // heights for which cached items exist. This list is sorted in descending order // to improve performance characteristics by positively correlating index with age. -type cacheItemHistory[T any] struct { +type cacheValueHistory[T any] struct { // sortedDescHeights is a list of the heights for which values are cached. // It is sorted in descending order. sortedDescHeights []int64 - itemsByHeight map[int64]cacheItem[T] + heightMap map[int64]cacheValue[T] } -// NewInMemoryCache creates a new InMemoryCache with the configuration generated +// NewInMemoryCache creates a new inMemoryCache with the configuration generated // by the given option functions. -func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) *InMemoryCache[T] { +func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) (*inMemoryCache[T], error) { config := DefaultQueryCacheConfig for _, opt := range opts { opt(&config) } - return &InMemoryCache[T]{ - items: make(map[string]interface{}), - config: config, + if err := config.Validate(); err != nil { + return nil, err } + + return &inMemoryCache[T]{ + values: make(map[string]cacheValue[T]), + valueHistories: make(map[string]cacheValueHistory[T]), + config: config, + }, nil } // Get retrieves the value from the cache with the given key. If the cache is // configured for historical mode, it will return the value at the latest **known** // height, which is only updated on calls to SetAtHeight, and therefore is not // guaranteed to be the current height w.r.t the blockchain. -func (c *InMemoryCache[T]) Get(key string) (T, error) { +func (c *inMemoryCache[T]) Get(key string) (T, error) { if c.config.historical { return c.GetAtHeight(key, c.latestHeight.Load()) } - c.itemsMu.RLock() - defer c.itemsMu.RUnlock() + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() var zero T - item, exists := c.items[key] + cachedItem, exists := c.values[key] if !exists { return zero, ErrCacheMiss.Wrapf("key: %s", key) } - cItem := item.(cacheItem[T]) - isTTLEnabled := c.config.TTL > 0 - isCacheItemExpired := time.Since(cItem.timestamp) > c.config.TTL + isTTLEnabled := c.config.ttl > 0 + isCacheItemExpired := time.Since(cachedItem.cachedAt) > c.config.ttl if isTTLEnabled && isCacheItemExpired { // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; // otherwise, the read lock would be insufficient. The value will be - // overwritten by the next call to Set(). + // overwritten by the next call to Set(). If usage is such that values + // aren't being subsequently set, maxKeys (if configured) will eventually + // cause the pruning of values with expired TTLs. return zero, ErrCacheMiss.Wrapf("key: %s", key) } - return cItem.value, nil + return cachedItem.value, nil } // GetAtHeight retrieves the value from the cache with the given key, at the given // height. If a value is not found for that height, the value at the nearest previous // height is returned. If the cache is not configured for historical mode, it returns // an error. -func (c *InMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { +func (c *inMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { var zero T if !c.config.historical { return zero, ErrHistoricalModeNotEnabled } - c.itemsMu.RLock() - defer c.itemsMu.RUnlock() - - itemHistoryAny, exists := c.items[key] - if !exists { - return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) - } + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() - itemHistory := itemHistoryAny.(cacheItemHistory[T]) + valueHistory := c.valueHistories[key] var nearestCachedHeight int64 = -1 - for _, cachedHeight := range itemHistory.sortedDescHeights { + for _, cachedHeight := range valueHistory.sortedDescHeights { if cachedHeight <= getHeight { nearestCachedHeight = cachedHeight // DEV_NOTE: Since the list is sorted in descending order, once we @@ -137,37 +137,47 @@ func (c *InMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) } - item := itemHistory.itemsByHeight[nearestCachedHeight] - if c.config.TTL > 0 && time.Since(item.timestamp) > c.config.TTL { + value, exists := valueHistory.heightMap[nearestCachedHeight] + if !exists { + return zero, ErrCacheInternal.Wrapf("failed to load historical value for key: %s, height: %d", key, getHeight) + } + + if c.config.ttl > 0 && time.Since(value.cachedAt) > c.config.ttl { // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; // otherwise, the read lock would be insufficient. The value will be pruned - // in the subsequent call to SetAtHeight() after c.config.pruneOlderThan - // blocks have elapsed. + // in the subsequent call to SetAtHeight() after c.config.numHistoricalValues + // blocks have elapsed. If usage is such that historical values aren't being + // subsequently set, numHistoricalBlocks (if configured) will eventually + // cause the pruning of historical values with expired TTLs. return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) } - return item.value, nil + return value.value, nil } // Set adds or updates the value in the cache for the given key. If the cache is // configured for historical mode, it will store the value at the latest **known** // height, which is only updated on calls to SetAtHeight, and therefore is not // guaranteed to be the current height. -func (c *InMemoryCache[T]) Set(key string, value T) error { +func (c *inMemoryCache[T]) Set(key string, value T) error { if c.config.historical { return c.SetAtHeight(key, value, c.latestHeight.Load()) } - if c.config.MaxKeys > 0 && int64(len(c.items)) >= c.config.MaxKeys { - c.evict() + isMaxKeysConfigured := c.config.maxKeys > 0 + cacheHasMaxKeys := int64(len(c.values)) >= c.config.maxKeys + if isMaxKeysConfigured && cacheHasMaxKeys { + if err := c.evict(); err != nil { + return err + } } - c.itemsMu.Lock() - defer c.itemsMu.Unlock() + c.valuesMu.Lock() + defer c.valuesMu.Unlock() - c.items[key] = cacheItem[T]{ - value: value, - timestamp: time.Now(), + c.values[key] = cacheValue[T]{ + value: value, + cachedAt: time.Now(), } return nil @@ -176,7 +186,7 @@ func (c *InMemoryCache[T]) Set(key string, value T) error { // SetAtHeight adds or updates the historical value in the cache for the given key, // and at the given height. If the cache is not configured for historical mode, it // returns an error. -func (c *InMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) error { +func (c *inMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) error { if !c.config.historical { return ErrHistoricalModeNotEnabled } @@ -188,120 +198,165 @@ func (c *InMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) err c.latestHeight.CompareAndSwap(latestHeight, setHeight) } - c.itemsMu.Lock() - defer c.itemsMu.Unlock() + c.valuesMu.Lock() + defer c.valuesMu.Unlock() - var itemHistory cacheItemHistory[T] - if itemHistoryAny, exists := c.items[key]; exists { - itemHistory = itemHistoryAny.(cacheItemHistory[T]) - } else { - itemsByHeight := make(map[int64]cacheItem[T]) - itemHistory = cacheItemHistory[T]{ + valueHistory, exists := c.valueHistories[key] + if !exists { + heightMap := make(map[int64]cacheValue[T]) + valueHistory = cacheValueHistory[T]{ sortedDescHeights: make([]int64, 0), - itemsByHeight: itemsByHeight, + heightMap: heightMap, } } // Update sortedDescHeights and ensure the list is sorted in descending order. - if _, setHeightExists := itemHistory.itemsByHeight[setHeight]; !setHeightExists { - itemHistory.sortedDescHeights = append(itemHistory.sortedDescHeights, setHeight) - sort.Slice(itemHistory.sortedDescHeights, func(i, j int) bool { - return itemHistory.sortedDescHeights[i] > itemHistory.sortedDescHeights[j] + if _, setHeightExists := valueHistory.heightMap[setHeight]; !setHeightExists { + valueHistory.sortedDescHeights = append(valueHistory.sortedDescHeights, setHeight) + sort.Slice(valueHistory.sortedDescHeights, func(i, j int) bool { + return valueHistory.sortedDescHeights[i] > valueHistory.sortedDescHeights[j] }) } // Prune historical values for this key, where the setHeight - // is oder than the configured pruneOlderThan. - if c.config.pruneOlderThan > 0 { - lenCachedHeights := int64(len(itemHistory.sortedDescHeights)) + // is oder than the configured numHistoricalValues. + if c.config.numHistoricalValues > 0 { + lenCachedHeights := int64(len(valueHistory.sortedDescHeights)) for heightIdx := lenCachedHeights - 1; heightIdx >= 0; heightIdx-- { - cachedHeight := itemHistory.sortedDescHeights[heightIdx] + cachedHeight := valueHistory.sortedDescHeights[heightIdx] // DEV_NOTE: Since the list is sorted, and we're iterating from lowest // (oldest) to highest (youngest) height, once we encounter a cachedHeight - // that is younger than the configured pruneOlderThan, ALL subsequent - // heights SHOULD also be younger than the configured pruneOlderThan. - if setHeight-cachedHeight <= c.config.pruneOlderThan { - itemHistory.sortedDescHeights = itemHistory.sortedDescHeights[:heightIdx+1] + // that is younger than the configured numHistoricalValues, ALL subsequent + // heights SHOULD also be younger than the configured numHistoricalValues. + if setHeight-cachedHeight <= c.config.numHistoricalValues { + valueHistory.sortedDescHeights = valueHistory.sortedDescHeights[:heightIdx+1] break } - delete(itemHistory.itemsByHeight, cachedHeight) + delete(valueHistory.heightMap, cachedHeight) } } - itemHistory.itemsByHeight[setHeight] = cacheItem[T]{ - value: value, - timestamp: time.Now(), + valueHistory.heightMap[setHeight] = cacheValue[T]{ + value: value, + cachedAt: time.Now(), } - c.items[key] = itemHistory + c.valueHistories[key] = valueHistory return nil } // Delete removes an item from the cache. -func (c *InMemoryCache[T]) Delete(key string) { - c.itemsMu.Lock() - defer c.itemsMu.Unlock() +func (c *inMemoryCache[T]) Delete(key string) { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() - delete(c.items, key) + if c.config.historical { + delete(c.valueHistories, key) + } else { + delete(c.values, key) + } } // Clear removes all items from the cache. -func (c *InMemoryCache[T]) Clear() { - c.itemsMu.Lock() - defer c.itemsMu.Unlock() +func (c *inMemoryCache[T]) Clear() { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + if c.config.historical { + c.valueHistories = make(map[string]cacheValueHistory[T]) + } else { + c.values = make(map[string]cacheValue[T]) + } - c.items = make(map[string]interface{}) c.latestHeight.Store(0) } // evict removes one item from the cache, to make space for a new one, // according to the configured eviction policy -func (c *InMemoryCache[T]) evict() { - switch c.config.EvictionPolicy { +func (c *inMemoryCache[T]) evict() error { + if c.config.historical { + return c.evictHistorical() + } else { + return c.evictNonHistorical() + } +} + +// evictHistorical removes one item from the cache, to make space for a new one, +// according to the configured eviction policy. +func (c *inMemoryCache[T]) evictHistorical() error { + switch c.config.evictionPolicy { case FirstInFirstOut: var oldestKey string var oldestTime time.Time - first := true - - for key, item := range c.items { - var itemTime time.Time - if c.config.historical { - itemHistory := item.(cacheItemHistory[T]) - for _, v := range itemHistory.itemsByHeight { - if itemTime.IsZero() || v.timestamp.Before(itemTime) { - itemTime = v.timestamp - } - } - } else { - itemTime = item.(cacheItem[T]).timestamp + for key, valueHistory := range c.valueHistories { + mostRecentHeight := valueHistory.sortedDescHeights[0] + value, exists := valueHistory.heightMap[mostRecentHeight] + if !exists { + return ErrCacheInternal.Wrapf( + "expected value history for key %s to contain height %d but it did not 💣", + key, mostRecentHeight, + ) } - if first || itemTime.Before(oldestTime) { + if value.cachedAt.IsZero() || value.cachedAt.Before(oldestTime) { oldestKey = key - oldestTime = itemTime - first = false + oldestTime = value.cachedAt } } - delete(c.items, oldestKey) + delete(c.valueHistories, oldestKey) + return nil case LeastRecentlyUsed: // TODO_IMPROVE: Implement LRU eviction // This will require tracking access times - panic("LRU eviction not implemented") + return ErrCacheInternal.Wrap("LRU eviction not implemented") case LeastFrequentlyUsed: // TODO_IMPROVE: Implement LFU eviction // This will require tracking access times - panic("LFU eviction not implemented") + return ErrCacheInternal.Wrap("LFU eviction not implemented") default: - // Default to FIFO if policy not recognized - for key := range c.items { - delete(c.items, key) - return + // DEV_NOTE: This SHOULD NEVER happen, QueryCacheConfig#Validate, SHOULD prevent it. + return ErrCacheInternal.Wrapf("unsupported eviction policy: %d", c.config.evictionPolicy) + } +} + +// evictNonHistorical removes one item from the cache, to make space for a new one, +// according to the configured eviction policy. +func (c *inMemoryCache[T]) evictNonHistorical() error { + switch c.config.evictionPolicy { + case FirstInFirstOut: + var ( + first = true + oldestKey string + oldestTime time.Time + ) + for key, value := range c.values { + if first || value.cachedAt.Before(oldestTime) { + oldestKey = key + oldestTime = value.cachedAt + } + first = false } + delete(c.values, oldestKey) + return nil + + case LeastRecentlyUsed: + // TODO_IMPROVE: Implement LRU eviction + // This will require tracking access times + return ErrCacheInternal.Wrap("LRU eviction not implemented") + + case LeastFrequentlyUsed: + // TODO_IMPROVE: Implement LFU eviction + // This will require tracking access times + return ErrCacheInternal.Wrap("LFU eviction not implemented") + + default: + // DEV_NOTE: This SHOULD NEVER happen, QueryCacheConfig#Validate, SHOULD prevent it. + return ErrCacheInternal.Wrapf("unsupported eviction policy: %d", c.config.evictionPolicy) } } diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go index 6bf6672cb..844f47637 100644 --- a/pkg/client/query/cache/memory_test.go +++ b/pkg/client/query/cache/memory_test.go @@ -12,10 +12,11 @@ import ( // TestInMemoryCache_NonHistorical tests the basic cache functionality without historical mode func TestInMemoryCache_NonHistorical(t *testing.T) { t.Run("basic operations", func(t *testing.T) { - cache := NewInMemoryCache[string]() + cache, err := NewInMemoryCache[string]() + require.NoError(t, err) // Test Set and Get - err := cache.Set("key1", "value1") + err = cache.Set("key1", "value1") require.NoError(t, err) val, err := cache.Get("key1") require.NoError(t, err) @@ -39,11 +40,12 @@ func TestInMemoryCache_NonHistorical(t *testing.T) { }) t.Run("TTL expiration", func(t *testing.T) { - cache := NewInMemoryCache[string]( + cache, err := NewInMemoryCache[string]( WithTTL(100 * time.Millisecond), ) + require.NoError(t, err) - err := cache.Set("key", "value") + err = cache.Set("key", "value") require.NoError(t, err) // Value should be available immediately @@ -59,14 +61,15 @@ func TestInMemoryCache_NonHistorical(t *testing.T) { require.ErrorIs(t, err, ErrCacheMiss) }) - t.Run("max size eviction", func(t *testing.T) { - cache := NewInMemoryCache[string]( + t.Run("max keys eviction", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( WithMaxKeys(2), WithEvictionPolicy(FirstInFirstOut), ) + require.NoError(t, err) // Add items up to max size - err := cache.Set("key1", "value1") + err = cache.Set("key1", "value1") require.NoError(t, err) err = cache.Set("key2", "value2") require.NoError(t, err) @@ -93,12 +96,13 @@ func TestInMemoryCache_NonHistorical(t *testing.T) { // TestInMemoryCache_Historical tests the historical mode functionality func TestInMemoryCache_Historical(t *testing.T) { t.Run("basic historical operations", func(t *testing.T) { - cache := NewInMemoryCache[string]( + cache, err := NewInMemoryCache[string]( WithHistoricalMode(100), ) + require.NoError(t, err) // Test SetAtHeight and GetAtHeight - err := cache.SetAtHeight("key", "value1", 10) + err = cache.SetAtHeight("key", "value1", 10) require.NoError(t, err) err = cache.SetAtHeight("key", "value2", 20) require.NoError(t, err) @@ -128,12 +132,13 @@ func TestInMemoryCache_Historical(t *testing.T) { }) t.Run("historical TTL expiration", func(t *testing.T) { - cache := NewInMemoryCache[string]( + cache, err := NewInMemoryCache[string]( WithHistoricalMode(100), WithTTL(100*time.Millisecond), ) + require.NoError(t, err) - err := cache.SetAtHeight("key", "value1", 10) + err = cache.SetAtHeight("key", "value1", 10) require.NoError(t, err) // Value should be available immediately @@ -141,7 +146,7 @@ func TestInMemoryCache_Historical(t *testing.T) { require.NoError(t, err) require.Equal(t, "value1", val) - // Wait for TTL to expire + // Wait for ttl to expire time.Sleep(150 * time.Millisecond) // Value should now be expired @@ -150,12 +155,13 @@ func TestInMemoryCache_Historical(t *testing.T) { }) t.Run("pruning old heights", func(t *testing.T) { - cache := NewInMemoryCache[string]( + cache, err := NewInMemoryCache[string]( WithHistoricalMode(10), // Prune entries older than 10 blocks ) + require.NoError(t, err) // Add entries at different heights - err := cache.SetAtHeight("key", "value1", 10) + err = cache.SetAtHeight("key", "value1", 10) require.NoError(t, err) err = cache.SetAtHeight("key", "value2", 20) require.NoError(t, err) @@ -183,12 +189,13 @@ func TestInMemoryCache_Historical(t *testing.T) { }) t.Run("non-historical operations on historical cache", func(t *testing.T) { - cache := NewInMemoryCache[string]( + cache, err := NewInMemoryCache[string]( WithHistoricalMode(100), ) + require.NoError(t, err) // Set some historical values - err := cache.SetAtHeight("key", "value1", 10) + err = cache.SetAtHeight("key", "value1", 10) require.NoError(t, err) err = cache.SetAtHeight("key", "value2", 20) require.NoError(t, err) @@ -216,10 +223,11 @@ func TestInMemoryCache_Historical(t *testing.T) { // TestInMemoryCache_ErrorCases tests various error conditions func TestInMemoryCache_ErrorCases(t *testing.T) { t.Run("historical operations on non-historical cache", func(t *testing.T) { - cache := NewInMemoryCache[string]() + cache, err := NewInMemoryCache[string]() + require.NoError(t, err) // Attempting historical operations should return error - err := cache.SetAtHeight("key", "value", 10) + err = cache.SetAtHeight("key", "value", 10) require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) _, err = cache.GetAtHeight("key", 10) @@ -227,10 +235,11 @@ func TestInMemoryCache_ErrorCases(t *testing.T) { }) t.Run("zero values", func(t *testing.T) { - cache := NewInMemoryCache[string]() + cache, err := NewInMemoryCache[string]() + require.NoError(t, err) // Test with empty key - err := cache.Set("", "value") + err = cache.Set("", "value") require.NoError(t, err) val, err := cache.Get("") require.NoError(t, err) @@ -248,7 +257,9 @@ func TestInMemoryCache_ErrorCases(t *testing.T) { // TestInMemoryCache_ConcurrentAccess tests thread safety of the cache func TestInMemoryCache_ConcurrentAccess(t *testing.T) { t.Run("concurrent access non-historical", func(t *testing.T) { - cache := NewInMemoryCache[int]() + cache, err := NewInMemoryCache[int]() + require.NoError(t, err) + const numGoroutines = 10 const numOperations = 100 @@ -292,9 +303,11 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { }) t.Run("concurrent access historical", func(t *testing.T) { - cache := NewInMemoryCache[int]( + cache, err := NewInMemoryCache[int]( WithHistoricalMode(100), ) + require.NoError(t, err) + const numGoroutines = 10 const numOpsPerGoRoutine = 100 From 71da7f1beb31437bbcf4806850e358766379f962 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Dec 2024 13:09:53 +0100 Subject: [PATCH 10/12] chore: review feedback improvements --- pkg/client/interface.go | 8 +- pkg/client/query/cache/config.go | 35 +++-- pkg/client/query/cache/memory.go | 180 ++++++++++++++------------ pkg/client/query/cache/memory_test.go | 80 ++++++------ 4 files changed, 165 insertions(+), 138 deletions(-) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 1f56a8870..ec3874273 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -375,8 +375,8 @@ type QueryCache[T any] interface { // at multiple heights for a given key. type HistoricalQueryCache[T any] interface { QueryCache[T] - // GetAtHeight retrieves the nearest value <= the specified height - GetAtHeight(key string, height int64) (T, error) - // SetAtHeight adds or updates a value at a specific height - SetAtHeight(key string, value T, height int64) error + // GetAsOfVersion retrieves the nearest value <= the specified version number. + GetAsOfVersion(key string, version int64) (T, error) + // SetAsOfVersion adds or updates a value at a specific version number. + SetAsOfVersion(key string, value T, version int64) error } diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index 49d80435e..7df87b49c 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -4,7 +4,7 @@ import ( "time" ) -// EvictionPolicy determines which items are removed when number of keys in the cache reaches maxKeys. +// EvictionPolicy determines which values are removed when number of keys in the cache reaches maxKeys. type EvictionPolicy int64 const ( @@ -16,7 +16,7 @@ const ( // queryCacheConfig is the configuration for query caches. // It is intended to be configured via QueryCacheOptionFn functions. type queryCacheConfig struct { - // maxKeys is the maximum number of items (key/value pairs) the cache can + // maxKeys is the maximum number of key/value pairs the cache can // hold before it starts evicting. maxKeys int64 @@ -27,18 +27,19 @@ type queryCacheConfig struct { // maxCacheSize is the maximum cumulative size of all keys AND values in the cache. // maxCacheSize int64 - // evictionPolicy determines which items are removed when number of keys in the cache reaches maxKeys. + // evictionPolicy determines which values are removed when number of keys in the cache reaches maxKeys. evictionPolicy EvictionPolicy - // ttl is how long items should remain in the cache. Items older than the ttl - // MAY NOT be evicted immediately, but are NEVER considered as cache hits. + // ttl is how long values should remain valid in the cache. Items older than the + // ttl MAY NOT be evicted immediately, but are NEVER considered as cache hits. ttl time.Duration // historical determines whether each key will point to a single values (false) // or a history (i.e. reverse chronological list) of values (true). historical bool - // numHistoricalValues is the number of past blocks for which to keep historical - // values. If 0, no historical pruning is performed. It only applies when - // historical is true. - numHistoricalValues int64 + // maxVersionAge is the max difference between the latest known version and + // any other version, below which value versions are retained, and above which + // value versions are pruned. If 0, no historical pruning is performed. + // It only applies when historical is true. + maxVersionAge int64 } // QueryCacheOptionFn is a function which receives a queryCacheConfig for configuration. @@ -53,15 +54,23 @@ func (cfg *queryCacheConfig) Validate() error { return ErrQueryCacheConfigValidation.Wrapf("eviction policy %d not imlemented", cfg.evictionPolicy) } + if cfg.maxVersionAge > 0 && cfg.historical == false { + return ErrQueryCacheConfigValidation.Wrap("maxVersionAge > 0 requires historical mode to be enabled") + } + + if cfg.historical && cfg.maxVersionAge < 0 { + return ErrQueryCacheConfigValidation.Wrapf("maxVersionAge MUST be >= 0, got: %d", cfg.maxVersionAge) + } + return nil } -// WithHistoricalMode enables historical caching with the given numHistoricalValues +// WithHistoricalMode enables historical caching with the given maxVersionAge // configuration; if 0, no historical pruning is performed. -func WithHistoricalMode(numHistoricalBlocks int64) QueryCacheOptionFn { +func WithHistoricalMode(numRetainedVersions int64) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { cfg.historical = true - cfg.numHistoricalValues = numHistoricalBlocks + cfg.maxVersionAge = numRetainedVersions } } @@ -80,7 +89,7 @@ func WithEvictionPolicy(policy EvictionPolicy) QueryCacheOptionFn { } } -// WithTTL sets the time-to-live for cached items. Items older than the TTL +// WithTTL sets the time-to-live for cached values. Values older than the TTL // MAY NOT be evicted immediately, but are NEVER considered as cache hits. func WithTTL(ttl time.Duration) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index 414efb3a9..f229cb1d4 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -25,8 +25,8 @@ var ( // inMemoryCache provides a concurrency-safe in-memory cache implementation with // optional historical value support. type inMemoryCache[T any] struct { - config queryCacheConfig - latestHeight atomic.Int64 + config queryCacheConfig + latestVersion atomic.Int64 // valuesMu is used to protect values AND valueHistories from concurrent access. valuesMu sync.RWMutex @@ -43,14 +43,17 @@ type cacheValue[T any] struct { cachedAt time.Time } -// cacheValueHistory stores cachedItems by height and maintains a sorted list of -// heights for which cached items exist. This list is sorted in descending order -// to improve performance characteristics by positively correlating index with age. +// cacheValueHistory stores cachedValues by version number and maintains a sorted +// list of version numbers for which cached values exist. This list is sorted in +// descending order to improve performance characteristics by positively correlating +// index with age. type cacheValueHistory[T any] struct { - // sortedDescHeights is a list of the heights for which values are cached. - // It is sorted in descending order. - sortedDescHeights []int64 - heightMap map[int64]cacheValue[T] + // sortedDescVersions is a list of the version numbers for which values are + // cached. It is sorted in descending order. + sortedDescVersions []int64 + // versionToValueMap is a map from a version number to the cached value at + // that version number, if present. + versionToValueMap map[int64]cacheValue[T] } // NewInMemoryCache creates a new inMemoryCache with the configuration generated @@ -75,11 +78,11 @@ func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) (*inMemoryCache[T], err // Get retrieves the value from the cache with the given key. If the cache is // configured for historical mode, it will return the value at the latest **known** -// height, which is only updated on calls to SetAtHeight, and therefore is not -// guaranteed to be the current height w.r.t the blockchain. +// version, which is only updated on calls to SetAsOfVersion, and therefore is not +// guaranteed to be the current version w.r.t the blockchain. func (c *inMemoryCache[T]) Get(key string) (T, error) { if c.config.historical { - return c.GetAtHeight(key, c.latestHeight.Load()) + return c.GetAsOfVersion(key, c.latestVersion.Load()) } c.valuesMu.RLock() @@ -87,14 +90,14 @@ func (c *inMemoryCache[T]) Get(key string) (T, error) { var zero T - cachedItem, exists := c.values[key] + cachedValue, exists := c.values[key] if !exists { return zero, ErrCacheMiss.Wrapf("key: %s", key) } isTTLEnabled := c.config.ttl > 0 - isCacheItemExpired := time.Since(cachedItem.cachedAt) > c.config.ttl - if isTTLEnabled && isCacheItemExpired { + isCacheValueExpired := time.Since(cachedValue.cachedAt) > c.config.ttl + if isTTLEnabled && isCacheValueExpired { // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; // otherwise, the read lock would be insufficient. The value will be // overwritten by the next call to Set(). If usage is such that values @@ -103,14 +106,14 @@ func (c *inMemoryCache[T]) Get(key string) (T, error) { return zero, ErrCacheMiss.Wrapf("key: %s", key) } - return cachedItem.value, nil + return cachedValue.value, nil } -// GetAtHeight retrieves the value from the cache with the given key, at the given -// height. If a value is not found for that height, the value at the nearest previous -// height is returned. If the cache is not configured for historical mode, it returns -// an error. -func (c *inMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { +// GetAsOfVersion retrieves the value from the cache with the given key, as of the +// given version. If a value is not found for that version, the value at the nearest +// previous version is returned. If the cache is not configured for historical mode, +// it returns an error. +func (c *inMemoryCache[T]) GetAsOfVersion(key string, version int64) (T, error) { var zero T if !c.config.historical { @@ -120,36 +123,43 @@ func (c *inMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { c.valuesMu.RLock() defer c.valuesMu.RUnlock() - valueHistory := c.valueHistories[key] - var nearestCachedHeight int64 = -1 - for _, cachedHeight := range valueHistory.sortedDescHeights { - if cachedHeight <= getHeight { - nearestCachedHeight = cachedHeight + valueHistory, exists := c.valueHistories[key] + if !exists { + return zero, ErrCacheMiss.Wrapf("key: %s", key) + } + + var nearestCachedVersion int64 = -1 + for _, cachedVersion := range valueHistory.sortedDescVersions { + if cachedVersion <= version { + nearestCachedVersion = cachedVersion // DEV_NOTE: Since the list is sorted in descending order, once we - // encounter a cachedHeight that is less than or equal to getHeight, - // all subsequent cachedHeights SHOULD also be less than or equal to - // getHeight. + // encounter a cachedVersion that is less than or equal to version, + // all subsequent cachedVersions SHOULD also be less than or equal to + // version. break } } - if nearestCachedHeight == -1 { - return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) + if nearestCachedVersion == -1 { + return zero, ErrCacheMiss.Wrapf("key: %s, version: %d", key, version) } - value, exists := valueHistory.heightMap[nearestCachedHeight] + value, exists := valueHistory.versionToValueMap[nearestCachedVersion] if !exists { - return zero, ErrCacheInternal.Wrapf("failed to load historical value for key: %s, height: %d", key, getHeight) + // DEV_NOTE: This SHOULD NEVER happen. If it does, it means that the cache has been corrupted. + return zero, ErrCacheInternal.Wrapf("failed to load historical value for key: %s, version: %d", key, version) } - if c.config.ttl > 0 && time.Since(value.cachedAt) > c.config.ttl { + isTTLEnabled := c.config.ttl > 0 + isCacheValueExpired := time.Since(value.cachedAt) > c.config.ttl + if isTTLEnabled && isCacheValueExpired { // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; // otherwise, the read lock would be insufficient. The value will be pruned - // in the subsequent call to SetAtHeight() after c.config.numHistoricalValues + // in the subsequent call to SetAsOfVersion() after c.config.maxVersionAge // blocks have elapsed. If usage is such that historical values aren't being // subsequently set, numHistoricalBlocks (if configured) will eventually // cause the pruning of historical values with expired TTLs. - return zero, ErrCacheMiss.Wrapf("key: %s, height: %d", key, getHeight) + return zero, ErrCacheMiss.Wrapf("key: %s, version: %d", key, version) } return value.value, nil @@ -157,24 +167,24 @@ func (c *inMemoryCache[T]) GetAtHeight(key string, getHeight int64) (T, error) { // Set adds or updates the value in the cache for the given key. If the cache is // configured for historical mode, it will store the value at the latest **known** -// height, which is only updated on calls to SetAtHeight, and therefore is not -// guaranteed to be the current height. +// version, which is only updated on calls to SetAsOfVersion, and therefore is not +// guaranteed to be the current version w.r.t. the blockchain. func (c *inMemoryCache[T]) Set(key string, value T) error { if c.config.historical { - return c.SetAtHeight(key, value, c.latestHeight.Load()) + return c.SetAsOfVersion(key, value, c.latestVersion.Load()) } + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + isMaxKeysConfigured := c.config.maxKeys > 0 - cacheHasMaxKeys := int64(len(c.values)) >= c.config.maxKeys - if isMaxKeysConfigured && cacheHasMaxKeys { + cacheMaxKeysReached := int64(len(c.values)) >= c.config.maxKeys + if isMaxKeysConfigured && cacheMaxKeysReached { if err := c.evict(); err != nil { return err } } - c.valuesMu.Lock() - defer c.valuesMu.Unlock() - c.values[key] = cacheValue[T]{ value: value, cachedAt: time.Now(), @@ -183,19 +193,22 @@ func (c *inMemoryCache[T]) Set(key string, value T) error { return nil } -// SetAtHeight adds or updates the historical value in the cache for the given key, -// and at the given height. If the cache is not configured for historical mode, it +// SetAsOfVersion adds or updates the historical value in the cache for the given key, +// and at the version number. If the cache is not configured for historical mode, it // returns an error. -func (c *inMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) error { +func (c *inMemoryCache[T]) SetAsOfVersion(key string, value T, version int64) error { if !c.config.historical { return ErrHistoricalModeNotEnabled } - // Update c.latestHeight if the given setHeight is newer (higher). - latestHeight := c.latestHeight.Load() - if setHeight > latestHeight { - // NB: Only update if c.latestHeight hasn't changed since we loaded it above. - c.latestHeight.CompareAndSwap(latestHeight, setHeight) + // Update c.latestVersion if the given version is newer (higher). + latestVersion := c.latestVersion.Load() + if version > latestVersion { + // NB: Only update if c.latestVersion hasn't changed since we loaded it above. + if !c.latestVersion.CompareAndSwap(latestVersion, version) { + // Reload the latestVersion if it did change. + latestVersion = c.latestVersion.Load() + } } c.valuesMu.Lock() @@ -203,42 +216,43 @@ func (c *inMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) err valueHistory, exists := c.valueHistories[key] if !exists { - heightMap := make(map[int64]cacheValue[T]) + versionToValueMap := make(map[int64]cacheValue[T]) valueHistory = cacheValueHistory[T]{ - sortedDescHeights: make([]int64, 0), - heightMap: heightMap, + sortedDescVersions: make([]int64, 0), + versionToValueMap: versionToValueMap, } } - // Update sortedDescHeights and ensure the list is sorted in descending order. - if _, setHeightExists := valueHistory.heightMap[setHeight]; !setHeightExists { - valueHistory.sortedDescHeights = append(valueHistory.sortedDescHeights, setHeight) - sort.Slice(valueHistory.sortedDescHeights, func(i, j int) bool { - return valueHistory.sortedDescHeights[i] > valueHistory.sortedDescHeights[j] + // Update sortedDescVersions and ensure the list is sorted in descending order. + if _, versionExists := valueHistory.versionToValueMap[version]; !versionExists { + valueHistory.sortedDescVersions = append(valueHistory.sortedDescVersions, version) + sort.Slice(valueHistory.sortedDescVersions, func(i, j int) bool { + return valueHistory.sortedDescVersions[i] > valueHistory.sortedDescVersions[j] }) } - // Prune historical values for this key, where the setHeight - // is oder than the configured numHistoricalValues. - if c.config.numHistoricalValues > 0 { - lenCachedHeights := int64(len(valueHistory.sortedDescHeights)) - for heightIdx := lenCachedHeights - 1; heightIdx >= 0; heightIdx-- { - cachedHeight := valueHistory.sortedDescHeights[heightIdx] + // Prune historical values for this key, where the version + // is older than the configured maxVersionAge. + if c.config.maxVersionAge > 0 { + lenCachedVersions := int64(len(valueHistory.sortedDescVersions)) + for versionIdx := lenCachedVersions - 1; versionIdx >= 0; versionIdx-- { + cachedVersion := valueHistory.sortedDescVersions[versionIdx] // DEV_NOTE: Since the list is sorted, and we're iterating from lowest - // (oldest) to highest (youngest) height, once we encounter a cachedHeight - // that is younger than the configured numHistoricalValues, ALL subsequent - // heights SHOULD also be younger than the configured numHistoricalValues. - if setHeight-cachedHeight <= c.config.numHistoricalValues { - valueHistory.sortedDescHeights = valueHistory.sortedDescHeights[:heightIdx+1] + // (oldest) to highest (newest) version, once we encounter a cachedVersion + // that is newer than the configured maxVersionAge, ALL subsequent + // heights SHOULD also be newer than the configured maxVersionAge. + cachedVersionAge := latestVersion - cachedVersion + if cachedVersionAge <= c.config.maxVersionAge { + valueHistory.sortedDescVersions = valueHistory.sortedDescVersions[:versionIdx+1] break } - delete(valueHistory.heightMap, cachedHeight) + delete(valueHistory.versionToValueMap, cachedVersion) } } - valueHistory.heightMap[setHeight] = cacheValue[T]{ + valueHistory.versionToValueMap[version] = cacheValue[T]{ value: value, cachedAt: time.Now(), } @@ -248,7 +262,7 @@ func (c *inMemoryCache[T]) SetAtHeight(key string, value T, setHeight int64) err return nil } -// Delete removes an item from the cache. +// Delete removes a value from the cache. func (c *inMemoryCache[T]) Delete(key string) { c.valuesMu.Lock() defer c.valuesMu.Unlock() @@ -260,7 +274,7 @@ func (c *inMemoryCache[T]) Delete(key string) { } } -// Clear removes all items from the cache. +// Clear removes all values from the cache. func (c *inMemoryCache[T]) Clear() { c.valuesMu.Lock() defer c.valuesMu.Unlock() @@ -271,10 +285,10 @@ func (c *inMemoryCache[T]) Clear() { c.values = make(map[string]cacheValue[T]) } - c.latestHeight.Store(0) + c.latestVersion.Store(0) } -// evict removes one item from the cache, to make space for a new one, +// evict removes one value from the cache, to make space for a new one, // according to the configured eviction policy func (c *inMemoryCache[T]) evict() error { if c.config.historical { @@ -284,20 +298,20 @@ func (c *inMemoryCache[T]) evict() error { } } -// evictHistorical removes one item from the cache, to make space for a new one, -// according to the configured eviction policy. +// evictHistorical removes one value (and all its versions) from the cache, +// to make space for a new one, according to the configured eviction policy. func (c *inMemoryCache[T]) evictHistorical() error { switch c.config.evictionPolicy { case FirstInFirstOut: var oldestKey string var oldestTime time.Time for key, valueHistory := range c.valueHistories { - mostRecentHeight := valueHistory.sortedDescHeights[0] - value, exists := valueHistory.heightMap[mostRecentHeight] + mostRecentVersion := valueHistory.sortedDescVersions[0] + value, exists := valueHistory.versionToValueMap[mostRecentVersion] if !exists { return ErrCacheInternal.Wrapf( - "expected value history for key %s to contain height %d but it did not 💣", - key, mostRecentHeight, + "expected value history for key %s to contain version %d but it did not 💣", + key, mostRecentVersion, ) } diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go index 844f47637..699949894 100644 --- a/pkg/client/query/cache/memory_test.go +++ b/pkg/client/query/cache/memory_test.go @@ -68,21 +68,21 @@ func TestInMemoryCache_NonHistorical(t *testing.T) { ) require.NoError(t, err) - // Add items up to max size + // Add values up to max keys err = cache.Set("key1", "value1") require.NoError(t, err) err = cache.Set("key2", "value2") require.NoError(t, err) - // Add one more item, should trigger eviction + // Add one more value, should trigger eviction err = cache.Set("key3", "value3") require.NoError(t, err) - // First item should be evicted + // First value should be evicted _, err = cache.Get("key1") require.ErrorIs(t, err, ErrCacheMiss) - // Other items should still be present + // Other values should still be present val, err := cache.Get("key2") require.NoError(t, err) require.Equal(t, "value2", val) @@ -101,34 +101,38 @@ func TestInMemoryCache_Historical(t *testing.T) { ) require.NoError(t, err) - // Test SetAtHeight and GetAtHeight - err = cache.SetAtHeight("key", "value1", 10) + // Test SetAsOfVersion and GetAsOfVersion + err = cache.SetAsOfVersion("key", "value1", 10) require.NoError(t, err) - err = cache.SetAtHeight("key", "value2", 20) + err = cache.SetAsOfVersion("key", "value2", 20) require.NoError(t, err) - // Test getting exact heights - val, err := cache.GetAtHeight("key", 10) + // Test getting exact versions + val, err := cache.GetAsOfVersion("key", 10) require.NoError(t, err) require.Equal(t, "value1", val) - val, err = cache.GetAtHeight("key", 20) + val, err = cache.GetAsOfVersion("key", 20) require.NoError(t, err) require.Equal(t, "value2", val) - // Test getting intermediate height (should return nearest lower height) - val, err = cache.GetAtHeight("key", 15) + // Test getting intermediate version (should return nearest lower version) + val, err = cache.GetAsOfVersion("key", 15) require.NoError(t, err) require.Equal(t, "value1", val) - // Test getting height before first entry - _, err = cache.GetAtHeight("key", 5) + // Test getting version before first entry + _, err = cache.GetAsOfVersion("key", 5) require.ErrorIs(t, err, ErrCacheMiss) - // Test getting height after last entry - val, err = cache.GetAtHeight("key", 25) + // Test getting version after last entry + val, err = cache.GetAsOfVersion("key", 25) require.NoError(t, err) require.Equal(t, "value2", val) + + // Test getting a version for a key that isn't cached + val, err = cache.GetAsOfVersion("key2", 20) + require.ErrorIs(t, err, ErrCacheMiss) }) t.Run("historical TTL expiration", func(t *testing.T) { @@ -138,11 +142,11 @@ func TestInMemoryCache_Historical(t *testing.T) { ) require.NoError(t, err) - err = cache.SetAtHeight("key", "value1", 10) + err = cache.SetAsOfVersion("key", "value1", 10) require.NoError(t, err) // Value should be available immediately - val, err := cache.GetAtHeight("key", 10) + val, err := cache.GetAsOfVersion("key", 10) require.NoError(t, err) require.Equal(t, "value1", val) @@ -150,40 +154,40 @@ func TestInMemoryCache_Historical(t *testing.T) { time.Sleep(150 * time.Millisecond) // Value should now be expired - _, err = cache.GetAtHeight("key", 10) + _, err = cache.GetAsOfVersion("key", 10) require.ErrorIs(t, err, ErrCacheMiss) }) - t.Run("pruning old heights", func(t *testing.T) { + t.Run("pruning old versions", func(t *testing.T) { cache, err := NewInMemoryCache[string]( WithHistoricalMode(10), // Prune entries older than 10 blocks ) require.NoError(t, err) - // Add entries at different heights - err = cache.SetAtHeight("key", "value1", 10) + // Add entries at different versions + err = cache.SetAsOfVersion("key", "value1", 10) require.NoError(t, err) - err = cache.SetAtHeight("key", "value2", 20) + err = cache.SetAsOfVersion("key", "value2", 20) require.NoError(t, err) - err = cache.SetAtHeight("key", "value3", 30) + err = cache.SetAsOfVersion("key", "value3", 30) require.NoError(t, err) // Add a new entry that should trigger pruning - err = cache.SetAtHeight("key", "value4", 40) + err = cache.SetAsOfVersion("key", "value4", 40) require.NoError(t, err) // Entries more than 10 blocks old should be pruned - _, err = cache.GetAtHeight("key", 10) + _, err = cache.GetAsOfVersion("key", 10) require.ErrorIs(t, err, ErrCacheMiss) - _, err = cache.GetAtHeight("key", 20) + _, err = cache.GetAsOfVersion("key", 20) require.ErrorIs(t, err, ErrCacheMiss) // Recent entries should still be available - val, err := cache.GetAtHeight("key", 30) + val, err := cache.GetAsOfVersion("key", 30) require.NoError(t, err) require.Equal(t, "value3", val) - val, err = cache.GetAtHeight("key", 40) + val, err = cache.GetAsOfVersion("key", 40) require.NoError(t, err) require.Equal(t, "value4", val) }) @@ -195,12 +199,12 @@ func TestInMemoryCache_Historical(t *testing.T) { require.NoError(t, err) // Set some historical values - err = cache.SetAtHeight("key", "value1", 10) + err = cache.SetAsOfVersion("key", "value1", 10) require.NoError(t, err) - err = cache.SetAtHeight("key", "value2", 20) + err = cache.SetAsOfVersion("key", "value2", 20) require.NoError(t, err) - // Regular Set should work with latest height + // Regular Set should work with latest version err = cache.Set("key", "value3") require.NoError(t, err) @@ -211,9 +215,9 @@ func TestInMemoryCache_Historical(t *testing.T) { // Delete should remove all historical values cache.Delete("key") - _, err = cache.GetAtHeight("key", 10) + _, err = cache.GetAsOfVersion("key", 10) require.ErrorIs(t, err, ErrCacheMiss) - _, err = cache.GetAtHeight("key", 20) + _, err = cache.GetAsOfVersion("key", 20) require.ErrorIs(t, err, ErrCacheMiss) _, err = cache.Get("key") require.ErrorIs(t, err, ErrCacheMiss) @@ -227,10 +231,10 @@ func TestInMemoryCache_ErrorCases(t *testing.T) { require.NoError(t, err) // Attempting historical operations should return error - err = cache.SetAtHeight("key", "value", 10) + err = cache.SetAsOfVersion("key", "value", 10) require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) - _, err = cache.GetAtHeight("key", 10) + _, err = cache.GetAsOfVersion("key", 10) require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) }) @@ -327,9 +331,9 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { return default: key := "key" - err := cache.SetAtHeight(key, j, int64(j)) + err := cache.SetAsOfVersion(key, j, int64(j)) require.NoError(t, err) - _, _ = cache.GetAtHeight(key, int64(j)) + _, _ = cache.GetAsOfVersion(key, int64(j)) } } }() From 6774d3aea1b961fa66c6110207c16db2add3ecce Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Dec 2024 13:28:53 +0100 Subject: [PATCH 11/12] fix: linter errors --- pkg/client/query/cache/config.go | 2 +- pkg/client/query/cache/memory_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index 7df87b49c..3881bbf5e 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -54,7 +54,7 @@ func (cfg *queryCacheConfig) Validate() error { return ErrQueryCacheConfigValidation.Wrapf("eviction policy %d not imlemented", cfg.evictionPolicy) } - if cfg.maxVersionAge > 0 && cfg.historical == false { + if cfg.maxVersionAge > 0 && !cfg.historical { return ErrQueryCacheConfigValidation.Wrap("maxVersionAge > 0 requires historical mode to be enabled") } diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go index 699949894..93c20cd9e 100644 --- a/pkg/client/query/cache/memory_test.go +++ b/pkg/client/query/cache/memory_test.go @@ -131,7 +131,7 @@ func TestInMemoryCache_Historical(t *testing.T) { require.Equal(t, "value2", val) // Test getting a version for a key that isn't cached - val, err = cache.GetAsOfVersion("key2", 20) + _, err = cache.GetAsOfVersion("key2", 20) require.ErrorIs(t, err, ErrCacheMiss) }) From 086d2199c88542fb92da84627795c43c7fa7bd92 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Dec 2024 13:55:12 +0100 Subject: [PATCH 12/12] chore: disable #Set() on historical cache --- pkg/client/query/cache/errors.go | 9 +++++---- pkg/client/query/cache/memory.go | 6 ++++-- pkg/client/query/cache/memory_test.go | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/client/query/cache/errors.go b/pkg/client/query/cache/errors.go index b80342999..b12bb5cac 100644 --- a/pkg/client/query/cache/errors.go +++ b/pkg/client/query/cache/errors.go @@ -5,8 +5,9 @@ import "cosmossdk.io/errors" const codesace = "client/query/cache" var ( - ErrCacheMiss = errors.Register(codesace, 1, "cache miss") - ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled") - ErrQueryCacheConfigValidation = errors.Register(codesace, 3, "invalid query cache config") - ErrCacheInternal = errors.Register(codesace, 4, "cache internal error") + ErrCacheMiss = errors.Register(codesace, 1, "cache miss") + ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled") + ErrQueryCacheConfigValidation = errors.Register(codesace, 3, "invalid query cache config") + ErrCacheInternal = errors.Register(codesace, 4, "cache internal error") + ErrUnsupportedHistoricalModeOp = errors.Register(codesace, 5, "operation not supported in historical mode") ) diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index f229cb1d4..088f3f5d1 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -171,7 +171,7 @@ func (c *inMemoryCache[T]) GetAsOfVersion(key string, version int64) (T, error) // guaranteed to be the current version w.r.t. the blockchain. func (c *inMemoryCache[T]) Set(key string, value T) error { if c.config.historical { - return c.SetAsOfVersion(key, value, c.latestVersion.Load()) + return ErrUnsupportedHistoricalModeOp.Wrap("inMemoryCache#Set() is not supported in historical mode") } c.valuesMu.Lock() @@ -205,7 +205,9 @@ func (c *inMemoryCache[T]) SetAsOfVersion(key string, value T, version int64) er latestVersion := c.latestVersion.Load() if version > latestVersion { // NB: Only update if c.latestVersion hasn't changed since we loaded it above. - if !c.latestVersion.CompareAndSwap(latestVersion, version) { + if c.latestVersion.CompareAndSwap(latestVersion, version) { + latestVersion = version + } else { // Reload the latestVersion if it did change. latestVersion = c.latestVersion.Load() } diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go index 93c20cd9e..ac5a742ad 100644 --- a/pkg/client/query/cache/memory_test.go +++ b/pkg/client/query/cache/memory_test.go @@ -206,12 +206,12 @@ func TestInMemoryCache_Historical(t *testing.T) { // Regular Set should work with latest version err = cache.Set("key", "value3") - require.NoError(t, err) + require.ErrorIs(t, err, ErrUnsupportedHistoricalModeOp) // Regular Get should return the latest value val, err := cache.Get("key") require.NoError(t, err) - require.Equal(t, "value3", val) + require.Equal(t, "value2", val) // Delete should remove all historical values cache.Delete("key")