diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 365c24b74..ec3874273 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -360,3 +360,23 @@ type BankQueryClient interface { // GetBalance queries the chain for the uPOKT balance of the account provided GetBalance(ctx context.Context, address string) (*cosmostypes.Coin, error) } + +// QueryCache is a key/value store style interface for a cache of a single type. +// It is intended to be used to cache query responses (or derivatives thereof), +// where each key uniquely indexes the most recent query response. +type QueryCache[T any] interface { + Get(key string) (T, error) + Set(key string, value T) error + Delete(key string) + Clear() +} + +// HistoricalQueryCache extends QueryCache to support getting and setting values +// at multiple heights for a given key. +type HistoricalQueryCache[T any] interface { + QueryCache[T] + // GetAsOfVersion retrieves the nearest value <= the specified version number. + GetAsOfVersion(key string, version int64) (T, error) + // SetAsOfVersion adds or updates a value at a specific version number. + SetAsOfVersion(key string, value T, version int64) error +} diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go new file mode 100644 index 000000000..3881bbf5e --- /dev/null +++ b/pkg/client/query/cache/config.go @@ -0,0 +1,98 @@ +package cache + +import ( + "time" +) + +// EvictionPolicy determines which values are removed when number of keys in the cache reaches maxKeys. +type EvictionPolicy int64 + +const ( + FirstInFirstOut = EvictionPolicy(iota) + LeastRecentlyUsed + LeastFrequentlyUsed +) + +// queryCacheConfig is the configuration for query caches. +// It is intended to be configured via QueryCacheOptionFn functions. +type queryCacheConfig struct { + // maxKeys is the maximum number of key/value pairs the cache can + // hold before it starts evicting. + maxKeys int64 + + // TODO_CONSIDERATION: + // + // maxValueSize is the maximum cumulative size of all values in the cache. + // maxValueSize int64 + // maxCacheSize is the maximum cumulative size of all keys AND values in the cache. + // maxCacheSize int64 + + // evictionPolicy determines which values are removed when number of keys in the cache reaches maxKeys. + evictionPolicy EvictionPolicy + // ttl is how long values should remain valid in the cache. Items older than the + // ttl MAY NOT be evicted immediately, but are NEVER considered as cache hits. + ttl time.Duration + // historical determines whether each key will point to a single values (false) + // or a history (i.e. reverse chronological list) of values (true). + historical bool + // maxVersionAge is the max difference between the latest known version and + // any other version, below which value versions are retained, and above which + // value versions are pruned. If 0, no historical pruning is performed. + // It only applies when historical is true. + maxVersionAge int64 +} + +// QueryCacheOptionFn is a function which receives a queryCacheConfig for configuration. +type QueryCacheOptionFn func(*queryCacheConfig) + +// Validate ensures that the queryCacheConfig isn't configured with incompatible options. +func (cfg *queryCacheConfig) Validate() error { + switch cfg.evictionPolicy { + case FirstInFirstOut: + // TODO_IMPROVE: support LeastRecentlyUsed and LeastFrequentlyUsed policies. + default: + return ErrQueryCacheConfigValidation.Wrapf("eviction policy %d not imlemented", cfg.evictionPolicy) + } + + if cfg.maxVersionAge > 0 && !cfg.historical { + return ErrQueryCacheConfigValidation.Wrap("maxVersionAge > 0 requires historical mode to be enabled") + } + + if cfg.historical && cfg.maxVersionAge < 0 { + return ErrQueryCacheConfigValidation.Wrapf("maxVersionAge MUST be >= 0, got: %d", cfg.maxVersionAge) + } + + return nil +} + +// WithHistoricalMode enables historical caching with the given maxVersionAge +// configuration; if 0, no historical pruning is performed. +func WithHistoricalMode(numRetainedVersions int64) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { + cfg.historical = true + cfg.maxVersionAge = numRetainedVersions + } +} + +// WithMaxKeys sets the maximum number of distinct key/value pairs the cache will +// hold before evicting according to the configured eviction policy. +func WithMaxKeys(maxKeys int64) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { + cfg.maxKeys = maxKeys + } +} + +// WithEvictionPolicy sets the eviction policy. +func WithEvictionPolicy(policy EvictionPolicy) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { + cfg.evictionPolicy = policy + } +} + +// WithTTL sets the time-to-live for cached values. Values older than the TTL +// MAY NOT be evicted immediately, but are NEVER considered as cache hits. +func WithTTL(ttl time.Duration) QueryCacheOptionFn { + return func(cfg *queryCacheConfig) { + cfg.ttl = ttl + } +} diff --git a/pkg/client/query/cache/errors.go b/pkg/client/query/cache/errors.go new file mode 100644 index 000000000..b12bb5cac --- /dev/null +++ b/pkg/client/query/cache/errors.go @@ -0,0 +1,13 @@ +package cache + +import "cosmossdk.io/errors" + +const codesace = "client/query/cache" + +var ( + ErrCacheMiss = errors.Register(codesace, 1, "cache miss") + ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled") + ErrQueryCacheConfigValidation = errors.Register(codesace, 3, "invalid query cache config") + ErrCacheInternal = errors.Register(codesace, 4, "cache internal error") + ErrUnsupportedHistoricalModeOp = errors.Register(codesace, 5, "operation not supported in historical mode") +) diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go new file mode 100644 index 000000000..088f3f5d1 --- /dev/null +++ b/pkg/client/query/cache/memory.go @@ -0,0 +1,378 @@ +package cache + +import ( + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/pokt-network/poktroll/pkg/client" +) + +var ( + _ client.QueryCache[any] = (*inMemoryCache[any])(nil) + _ client.HistoricalQueryCache[any] = (*inMemoryCache[any])(nil) + + DefaultQueryCacheConfig = queryCacheConfig{ + evictionPolicy: FirstInFirstOut, + // TODO_MAINNET(@bryanchriswhite): Consider how we can "guarantee" good + // alignment between the TTL and the block production rate, + // by accessing onchain block times directly. + ttl: time.Minute, + } +) + +// inMemoryCache provides a concurrency-safe in-memory cache implementation with +// optional historical value support. +type inMemoryCache[T any] struct { + config queryCacheConfig + latestVersion atomic.Int64 + + // valuesMu is used to protect values AND valueHistories from concurrent access. + valuesMu sync.RWMutex + // values holds the cached values in non-historical mode. + values map[string]cacheValue[T] + // valueHistories holds the cached historical values in historical mode. + valueHistories map[string]cacheValueHistory[T] +} + +// cacheValue wraps cached values with a cachedAt for later comparison against +// the configured TTL. +type cacheValue[T any] struct { + value T + cachedAt time.Time +} + +// cacheValueHistory stores cachedValues by version number and maintains a sorted +// list of version numbers for which cached values exist. This list is sorted in +// descending order to improve performance characteristics by positively correlating +// index with age. +type cacheValueHistory[T any] struct { + // sortedDescVersions is a list of the version numbers for which values are + // cached. It is sorted in descending order. + sortedDescVersions []int64 + // versionToValueMap is a map from a version number to the cached value at + // that version number, if present. + versionToValueMap map[int64]cacheValue[T] +} + +// NewInMemoryCache creates a new inMemoryCache with the configuration generated +// by the given option functions. +func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) (*inMemoryCache[T], error) { + config := DefaultQueryCacheConfig + + for _, opt := range opts { + opt(&config) + } + + if err := config.Validate(); err != nil { + return nil, err + } + + return &inMemoryCache[T]{ + values: make(map[string]cacheValue[T]), + valueHistories: make(map[string]cacheValueHistory[T]), + config: config, + }, nil +} + +// Get retrieves the value from the cache with the given key. If the cache is +// configured for historical mode, it will return the value at the latest **known** +// version, which is only updated on calls to SetAsOfVersion, and therefore is not +// guaranteed to be the current version w.r.t the blockchain. +func (c *inMemoryCache[T]) Get(key string) (T, error) { + if c.config.historical { + return c.GetAsOfVersion(key, c.latestVersion.Load()) + } + + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() + + var zero T + + cachedValue, exists := c.values[key] + if !exists { + return zero, ErrCacheMiss.Wrapf("key: %s", key) + } + + isTTLEnabled := c.config.ttl > 0 + isCacheValueExpired := time.Since(cachedValue.cachedAt) > c.config.ttl + if isTTLEnabled && isCacheValueExpired { + // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; + // otherwise, the read lock would be insufficient. The value will be + // overwritten by the next call to Set(). If usage is such that values + // aren't being subsequently set, maxKeys (if configured) will eventually + // cause the pruning of values with expired TTLs. + return zero, ErrCacheMiss.Wrapf("key: %s", key) + } + + return cachedValue.value, nil +} + +// GetAsOfVersion retrieves the value from the cache with the given key, as of the +// given version. If a value is not found for that version, the value at the nearest +// previous version is returned. If the cache is not configured for historical mode, +// it returns an error. +func (c *inMemoryCache[T]) GetAsOfVersion(key string, version int64) (T, error) { + var zero T + + if !c.config.historical { + return zero, ErrHistoricalModeNotEnabled + } + + c.valuesMu.RLock() + defer c.valuesMu.RUnlock() + + valueHistory, exists := c.valueHistories[key] + if !exists { + return zero, ErrCacheMiss.Wrapf("key: %s", key) + } + + var nearestCachedVersion int64 = -1 + for _, cachedVersion := range valueHistory.sortedDescVersions { + if cachedVersion <= version { + nearestCachedVersion = cachedVersion + // DEV_NOTE: Since the list is sorted in descending order, once we + // encounter a cachedVersion that is less than or equal to version, + // all subsequent cachedVersions SHOULD also be less than or equal to + // version. + break + } + } + + if nearestCachedVersion == -1 { + return zero, ErrCacheMiss.Wrapf("key: %s, version: %d", key, version) + } + + value, exists := valueHistory.versionToValueMap[nearestCachedVersion] + if !exists { + // DEV_NOTE: This SHOULD NEVER happen. If it does, it means that the cache has been corrupted. + return zero, ErrCacheInternal.Wrapf("failed to load historical value for key: %s, version: %d", key, version) + } + + isTTLEnabled := c.config.ttl > 0 + isCacheValueExpired := time.Since(value.cachedAt) > c.config.ttl + if isTTLEnabled && isCacheValueExpired { + // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; + // otherwise, the read lock would be insufficient. The value will be pruned + // in the subsequent call to SetAsOfVersion() after c.config.maxVersionAge + // blocks have elapsed. If usage is such that historical values aren't being + // subsequently set, numHistoricalBlocks (if configured) will eventually + // cause the pruning of historical values with expired TTLs. + return zero, ErrCacheMiss.Wrapf("key: %s, version: %d", key, version) + } + + return value.value, nil +} + +// Set adds or updates the value in the cache for the given key. If the cache is +// configured for historical mode, it will store the value at the latest **known** +// version, which is only updated on calls to SetAsOfVersion, and therefore is not +// guaranteed to be the current version w.r.t. the blockchain. +func (c *inMemoryCache[T]) Set(key string, value T) error { + if c.config.historical { + return ErrUnsupportedHistoricalModeOp.Wrap("inMemoryCache#Set() is not supported in historical mode") + } + + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + isMaxKeysConfigured := c.config.maxKeys > 0 + cacheMaxKeysReached := int64(len(c.values)) >= c.config.maxKeys + if isMaxKeysConfigured && cacheMaxKeysReached { + if err := c.evict(); err != nil { + return err + } + } + + c.values[key] = cacheValue[T]{ + value: value, + cachedAt: time.Now(), + } + + return nil +} + +// SetAsOfVersion adds or updates the historical value in the cache for the given key, +// and at the version number. If the cache is not configured for historical mode, it +// returns an error. +func (c *inMemoryCache[T]) SetAsOfVersion(key string, value T, version int64) error { + if !c.config.historical { + return ErrHistoricalModeNotEnabled + } + + // Update c.latestVersion if the given version is newer (higher). + latestVersion := c.latestVersion.Load() + if version > latestVersion { + // NB: Only update if c.latestVersion hasn't changed since we loaded it above. + if c.latestVersion.CompareAndSwap(latestVersion, version) { + latestVersion = version + } else { + // Reload the latestVersion if it did change. + latestVersion = c.latestVersion.Load() + } + } + + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + valueHistory, exists := c.valueHistories[key] + if !exists { + versionToValueMap := make(map[int64]cacheValue[T]) + valueHistory = cacheValueHistory[T]{ + sortedDescVersions: make([]int64, 0), + versionToValueMap: versionToValueMap, + } + } + + // Update sortedDescVersions and ensure the list is sorted in descending order. + if _, versionExists := valueHistory.versionToValueMap[version]; !versionExists { + valueHistory.sortedDescVersions = append(valueHistory.sortedDescVersions, version) + sort.Slice(valueHistory.sortedDescVersions, func(i, j int) bool { + return valueHistory.sortedDescVersions[i] > valueHistory.sortedDescVersions[j] + }) + } + + // Prune historical values for this key, where the version + // is older than the configured maxVersionAge. + if c.config.maxVersionAge > 0 { + lenCachedVersions := int64(len(valueHistory.sortedDescVersions)) + for versionIdx := lenCachedVersions - 1; versionIdx >= 0; versionIdx-- { + cachedVersion := valueHistory.sortedDescVersions[versionIdx] + + // DEV_NOTE: Since the list is sorted, and we're iterating from lowest + // (oldest) to highest (newest) version, once we encounter a cachedVersion + // that is newer than the configured maxVersionAge, ALL subsequent + // heights SHOULD also be newer than the configured maxVersionAge. + cachedVersionAge := latestVersion - cachedVersion + if cachedVersionAge <= c.config.maxVersionAge { + valueHistory.sortedDescVersions = valueHistory.sortedDescVersions[:versionIdx+1] + break + } + + delete(valueHistory.versionToValueMap, cachedVersion) + } + } + + valueHistory.versionToValueMap[version] = cacheValue[T]{ + value: value, + cachedAt: time.Now(), + } + + c.valueHistories[key] = valueHistory + + return nil +} + +// Delete removes a value from the cache. +func (c *inMemoryCache[T]) Delete(key string) { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + if c.config.historical { + delete(c.valueHistories, key) + } else { + delete(c.values, key) + } +} + +// Clear removes all values from the cache. +func (c *inMemoryCache[T]) Clear() { + c.valuesMu.Lock() + defer c.valuesMu.Unlock() + + if c.config.historical { + c.valueHistories = make(map[string]cacheValueHistory[T]) + } else { + c.values = make(map[string]cacheValue[T]) + } + + c.latestVersion.Store(0) +} + +// evict removes one value from the cache, to make space for a new one, +// according to the configured eviction policy +func (c *inMemoryCache[T]) evict() error { + if c.config.historical { + return c.evictHistorical() + } else { + return c.evictNonHistorical() + } +} + +// evictHistorical removes one value (and all its versions) from the cache, +// to make space for a new one, according to the configured eviction policy. +func (c *inMemoryCache[T]) evictHistorical() error { + switch c.config.evictionPolicy { + case FirstInFirstOut: + var oldestKey string + var oldestTime time.Time + for key, valueHistory := range c.valueHistories { + mostRecentVersion := valueHistory.sortedDescVersions[0] + value, exists := valueHistory.versionToValueMap[mostRecentVersion] + if !exists { + return ErrCacheInternal.Wrapf( + "expected value history for key %s to contain version %d but it did not 💣", + key, mostRecentVersion, + ) + } + + if value.cachedAt.IsZero() || value.cachedAt.Before(oldestTime) { + oldestKey = key + oldestTime = value.cachedAt + } + } + delete(c.valueHistories, oldestKey) + return nil + + case LeastRecentlyUsed: + // TODO_IMPROVE: Implement LRU eviction + // This will require tracking access times + return ErrCacheInternal.Wrap("LRU eviction not implemented") + + case LeastFrequentlyUsed: + // TODO_IMPROVE: Implement LFU eviction + // This will require tracking access times + return ErrCacheInternal.Wrap("LFU eviction not implemented") + + default: + // DEV_NOTE: This SHOULD NEVER happen, QueryCacheConfig#Validate, SHOULD prevent it. + return ErrCacheInternal.Wrapf("unsupported eviction policy: %d", c.config.evictionPolicy) + } +} + +// evictNonHistorical removes one item from the cache, to make space for a new one, +// according to the configured eviction policy. +func (c *inMemoryCache[T]) evictNonHistorical() error { + switch c.config.evictionPolicy { + case FirstInFirstOut: + var ( + first = true + oldestKey string + oldestTime time.Time + ) + for key, value := range c.values { + if first || value.cachedAt.Before(oldestTime) { + oldestKey = key + oldestTime = value.cachedAt + } + first = false + } + delete(c.values, oldestKey) + return nil + + case LeastRecentlyUsed: + // TODO_IMPROVE: Implement LRU eviction + // This will require tracking access times + return ErrCacheInternal.Wrap("LRU eviction not implemented") + + case LeastFrequentlyUsed: + // TODO_IMPROVE: Implement LFU eviction + // This will require tracking access times + return ErrCacheInternal.Wrap("LFU eviction not implemented") + + default: + // DEV_NOTE: This SHOULD NEVER happen, QueryCacheConfig#Validate, SHOULD prevent it. + return ErrCacheInternal.Wrapf("unsupported eviction policy: %d", c.config.evictionPolicy) + } +} diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go new file mode 100644 index 000000000..ac5a742ad --- /dev/null +++ b/pkg/client/query/cache/memory_test.go @@ -0,0 +1,356 @@ +package cache + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestInMemoryCache_NonHistorical tests the basic cache functionality without historical mode +func TestInMemoryCache_NonHistorical(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { + cache, err := NewInMemoryCache[string]() + require.NoError(t, err) + + // Test Set and Get + err = cache.Set("key1", "value1") + require.NoError(t, err) + val, err := cache.Get("key1") + require.NoError(t, err) + require.Equal(t, "value1", val) + + // Test missing key + _, err = cache.Get("nonexistent") + require.ErrorIs(t, err, ErrCacheMiss) + + // Test Delete + cache.Delete("key1") + _, err = cache.Get("key1") + require.ErrorIs(t, err, ErrCacheMiss) + + // Test Clear + err = cache.Set("key2", "value2") + require.NoError(t, err) + cache.Clear() + _, err = cache.Get("key2") + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("TTL expiration", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( + WithTTL(100 * time.Millisecond), + ) + require.NoError(t, err) + + err = cache.Set("key", "value") + require.NoError(t, err) + + // Value should be available immediately + val, err := cache.Get("key") + require.NoError(t, err) + require.Equal(t, "value", val) + + // Wait for TTL to expire + time.Sleep(150 * time.Millisecond) + + // Value should now be expired + _, err = cache.Get("key") + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("max keys eviction", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( + WithMaxKeys(2), + WithEvictionPolicy(FirstInFirstOut), + ) + require.NoError(t, err) + + // Add values up to max keys + err = cache.Set("key1", "value1") + require.NoError(t, err) + err = cache.Set("key2", "value2") + require.NoError(t, err) + + // Add one more value, should trigger eviction + err = cache.Set("key3", "value3") + require.NoError(t, err) + + // First value should be evicted + _, err = cache.Get("key1") + require.ErrorIs(t, err, ErrCacheMiss) + + // Other values should still be present + val, err := cache.Get("key2") + require.NoError(t, err) + require.Equal(t, "value2", val) + + val, err = cache.Get("key3") + require.NoError(t, err) + require.Equal(t, "value3", val) + }) +} + +// TestInMemoryCache_Historical tests the historical mode functionality +func TestInMemoryCache_Historical(t *testing.T) { + t.Run("basic historical operations", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( + WithHistoricalMode(100), + ) + require.NoError(t, err) + + // Test SetAsOfVersion and GetAsOfVersion + err = cache.SetAsOfVersion("key", "value1", 10) + require.NoError(t, err) + err = cache.SetAsOfVersion("key", "value2", 20) + require.NoError(t, err) + + // Test getting exact versions + val, err := cache.GetAsOfVersion("key", 10) + require.NoError(t, err) + require.Equal(t, "value1", val) + + val, err = cache.GetAsOfVersion("key", 20) + require.NoError(t, err) + require.Equal(t, "value2", val) + + // Test getting intermediate version (should return nearest lower version) + val, err = cache.GetAsOfVersion("key", 15) + require.NoError(t, err) + require.Equal(t, "value1", val) + + // Test getting version before first entry + _, err = cache.GetAsOfVersion("key", 5) + require.ErrorIs(t, err, ErrCacheMiss) + + // Test getting version after last entry + val, err = cache.GetAsOfVersion("key", 25) + require.NoError(t, err) + require.Equal(t, "value2", val) + + // Test getting a version for a key that isn't cached + _, err = cache.GetAsOfVersion("key2", 20) + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("historical TTL expiration", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( + WithHistoricalMode(100), + WithTTL(100*time.Millisecond), + ) + require.NoError(t, err) + + err = cache.SetAsOfVersion("key", "value1", 10) + require.NoError(t, err) + + // Value should be available immediately + val, err := cache.GetAsOfVersion("key", 10) + require.NoError(t, err) + require.Equal(t, "value1", val) + + // Wait for ttl to expire + time.Sleep(150 * time.Millisecond) + + // Value should now be expired + _, err = cache.GetAsOfVersion("key", 10) + require.ErrorIs(t, err, ErrCacheMiss) + }) + + t.Run("pruning old versions", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( + WithHistoricalMode(10), // Prune entries older than 10 blocks + ) + require.NoError(t, err) + + // Add entries at different versions + err = cache.SetAsOfVersion("key", "value1", 10) + require.NoError(t, err) + err = cache.SetAsOfVersion("key", "value2", 20) + require.NoError(t, err) + err = cache.SetAsOfVersion("key", "value3", 30) + require.NoError(t, err) + + // Add a new entry that should trigger pruning + err = cache.SetAsOfVersion("key", "value4", 40) + require.NoError(t, err) + + // Entries more than 10 blocks old should be pruned + _, err = cache.GetAsOfVersion("key", 10) + require.ErrorIs(t, err, ErrCacheMiss) + _, err = cache.GetAsOfVersion("key", 20) + require.ErrorIs(t, err, ErrCacheMiss) + + // Recent entries should still be available + val, err := cache.GetAsOfVersion("key", 30) + require.NoError(t, err) + require.Equal(t, "value3", val) + + val, err = cache.GetAsOfVersion("key", 40) + require.NoError(t, err) + require.Equal(t, "value4", val) + }) + + t.Run("non-historical operations on historical cache", func(t *testing.T) { + cache, err := NewInMemoryCache[string]( + WithHistoricalMode(100), + ) + require.NoError(t, err) + + // Set some historical values + err = cache.SetAsOfVersion("key", "value1", 10) + require.NoError(t, err) + err = cache.SetAsOfVersion("key", "value2", 20) + require.NoError(t, err) + + // Regular Set should work with latest version + err = cache.Set("key", "value3") + require.ErrorIs(t, err, ErrUnsupportedHistoricalModeOp) + + // Regular Get should return the latest value + val, err := cache.Get("key") + require.NoError(t, err) + require.Equal(t, "value2", val) + + // Delete should remove all historical values + cache.Delete("key") + _, err = cache.GetAsOfVersion("key", 10) + require.ErrorIs(t, err, ErrCacheMiss) + _, err = cache.GetAsOfVersion("key", 20) + require.ErrorIs(t, err, ErrCacheMiss) + _, err = cache.Get("key") + require.ErrorIs(t, err, ErrCacheMiss) + }) +} + +// TestInMemoryCache_ErrorCases tests various error conditions +func TestInMemoryCache_ErrorCases(t *testing.T) { + t.Run("historical operations on non-historical cache", func(t *testing.T) { + cache, err := NewInMemoryCache[string]() + require.NoError(t, err) + + // Attempting historical operations should return error + err = cache.SetAsOfVersion("key", "value", 10) + require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) + + _, err = cache.GetAsOfVersion("key", 10) + require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) + }) + + t.Run("zero values", func(t *testing.T) { + cache, err := NewInMemoryCache[string]() + require.NoError(t, err) + + // Test with empty key + err = cache.Set("", "value") + require.NoError(t, err) + val, err := cache.Get("") + require.NoError(t, err) + require.Equal(t, "value", val) + + // Test with empty value + err = cache.Set("key", "") + require.NoError(t, err) + val, err = cache.Get("key") + require.NoError(t, err) + require.Equal(t, "", val) + }) +} + +// TestInMemoryCache_ConcurrentAccess tests thread safety of the cache +func TestInMemoryCache_ConcurrentAccess(t *testing.T) { + t.Run("concurrent access non-historical", func(t *testing.T) { + cache, err := NewInMemoryCache[int]() + require.NoError(t, err) + + const numGoroutines = 10 + const numOperations = 100 + + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < numOperations; j++ { + select { + case <-ctx.Done(): + return + default: + key := "key" + err := cache.Set(key, j) + require.NoError(t, err) + _, _ = cache.Get(key) + } + } + }() + } + + // Wait for waitgroup with timeout. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + t.Errorf("test timed out waiting for workgroup to complete: %+v", ctx.Err()) + case <-done: + t.Log("test completed successfully") + } + }) + + t.Run("concurrent access historical", func(t *testing.T) { + cache, err := NewInMemoryCache[int]( + WithHistoricalMode(100), + ) + require.NoError(t, err) + + const numGoroutines = 10 + const numOpsPerGoRoutine = 100 + + // Create a context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < numOpsPerGoRoutine; j++ { + select { + case <-ctx.Done(): + return + default: + key := "key" + err := cache.SetAsOfVersion(key, j, int64(j)) + require.NoError(t, err) + _, _ = cache.GetAsOfVersion(key, int64(j)) + } + } + }() + } + + // Wait for waitgroup with timeout. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + t.Errorf("test timed out waiting for goroutines to complete: %+v", ctx.Err()) + case <-done: + t.Log("test completed successfully") + } + }) +}