Skip to content

Commit

Permalink
Cache: Add ReadThrough mode (#386)
Browse files Browse the repository at this point in the history
* Rename AllowStale to AnyCachedValue
* Rename MaxStaleness to AllowStale
  • Loading branch information
bartelink authored Jun 5, 2023
1 parent c7c4f62 commit b33b08f
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 75 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox`: `StreamId` replaces usage of `FsCodec.StreamName` [#353](https://github.com/jet/equinox/pull/353) [#378](https://github.com/jet/equinox/pull/378)
- `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341)
- `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.DeciderCore`: C# friendly equivalent of `Decider` (i.e. `Func` and `Task`) [#338](https://github.com/jet/equinox/pull/338)
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
Expand All @@ -27,6 +28,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Performance: Switch surface APIs to `struct` Tuples and Options where relevant, some due to `struct` changes in [`FsCodec` #82](https://github.com/jet/FsCodec/pull/82), and use `task` in hot paths [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
Expand Down
6 changes: 3 additions & 3 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ let create resolve = Service(streamId >> resolve Category)
```

`Read` above will do a roundtrip to the Store in order to fetch the most recent
state (in `AllowStale` mode, the store roundtrip can be optimized out by
state (in `AnyCachedValue` or `AllowStale` modes, the store roundtrip can be optimized out by
reading through the cache). This Synchronous Read can be used to
[Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency)
to establish a state incorporating the effects of any Command invocation you
Expand Down Expand Up @@ -1313,7 +1313,7 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St
member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state }, opt)
Expand Down Expand Up @@ -1375,7 +1375,7 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
type Service ... =
member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Some Equinox.AllowStale else Equinox.RequireLoad
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
return interpretMany Fold.fold (Seq.map interpret commands) state }
#endif
let decider = resolve cartId
let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.TransactAsync(interpret, opt)

member x.ExecuteManyAsync(cartId, optimistic, commands: Command seq, ?prepare): Async<unit> =
Expand All @@ -166,7 +166,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
decider.Query id
member _.ReadStale cartId =
let decider = resolve cartId
decider.Query(id, Equinox.LoadOption.AllowStale)
decider.Query(id, Equinox.LoadOption.AnyCachedValue)

let create resolve =
Service(streamId >> resolve Category)
2 changes: 1 addition & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S

member _.ReadStale(email) =
let decider = resolve email
decider.Query(id, Equinox.AllowStale)
decider.Query(id, Equinox.AnyCachedValue)

let create resolve =
Service(streamId >> resolve Category)
1 change: 1 addition & 0 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
/// Await the outcome of the computation.
member _.Await() = workflow.Value

/// Singleton Empty value
static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(System.InvalidOperationException "Uninitialized AsyncLazy"))

/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
Expand Down
119 changes: 88 additions & 31 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,120 @@ module internal CacheItemOptions =
| RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative)

type ICache =
abstract member TryGet:
key: string
-> Task<struct (StreamToken * 'state) voption>
abstract member UpdateIfNewer:
key: string
* isStale: System.Func<StreamToken, StreamToken, bool>
abstract member Load: key: string
* maxAge: TimeSpan
* isStale: Func<StreamToken, StreamToken, bool>
* options: CacheItemOptions
* token: StreamToken
* state: 'state
-> Task<unit>
* loadOrReload: (struct (StreamToken * 'state) voption -> Task<struct (StreamToken * 'state)>)
-> Task<struct (StreamToken * 'state)>
abstract member Save: key: string
* isStale: Func<StreamToken, StreamToken, bool>
* options: CacheItemOptions
* timestamp: int64
* token: StreamToken * state: 'state
-> unit

namespace Equinox

open Equinox.Core
open Equinox.Core.Tracing
open System
open System.Threading.Tasks

type internal CacheEntry<'state>(initialToken: StreamToken, initialState: 'state) =
type internal CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) =
let mutable currentToken = initialToken
let mutable currentState = initialState
member x.Value: struct (StreamToken * 'state) =
lock x <| fun () ->
currentToken, currentState
member x.UpdateIfNewer(isStale: System.Func<StreamToken, StreamToken, bool>, other: CacheEntry<'state>) =
let mutable verifiedTimestamp = initialTimestamp
let tryGet () =
if verifiedTimestamp = 0 then ValueNone
else ValueSome (struct (currentToken, currentState))
let mutable cell = AsyncLazy<struct(int64 * (struct (StreamToken * 'state)))>.Empty
static member CreateEmpty() =
new CacheEntry<'state>(Unchecked.defaultof<StreamToken>, Unchecked.defaultof<'state>, 0)
member x.TryGetValue(): (struct (StreamToken * 'state)) voption =
lock x tryGet
member x.MergeUpdates(isStale: Func<StreamToken, StreamToken, bool>, timestamp, token, state) =
lock x <| fun () ->
let struct (candidateToken, state) = other.Value
if not (isStale.Invoke(currentToken, candidateToken)) then
currentToken <- candidateToken
if not (isStale.Invoke(currentToken, token)) then
currentToken <- token
currentState <- state
if verifiedTimestamp < timestamp then // Don't count attempts to overwrite with stale state as verification
verifiedTimestamp <- timestamp
// Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first!
member x.ReadThrough(maxAge: TimeSpan, isStale, load) = task {
let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds
let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp)
match lock x fetchStateConsistently with
| _, ValueSome cachedValue, true ->
return cachedValue
| ourInitialCellState, maybeBaseState, _ -> // If it's not good enough for us, trigger a request (though someone may have beaten us to that)

// Inspect/await any concurrent attempt to see if it is sufficient for our needs
match! ourInitialCellState.TryAwaitValid() with
| ValueSome (fetchCommencedTimestamp, res) when isWithinMaxAge fetchCommencedTimestamp -> return res
| _ ->

// .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid)
let newInstance = AsyncLazy(load maybeBaseState)
let _ = System.Threading.Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState)
let! timestamp, (token, state as res) = cell.Await()
x.MergeUpdates(isStale, timestamp, token, state) // merge observed result into the cache
return res }

type Cache private (inner: System.Runtime.Caching.MemoryCache) =
let tryLoad key =
match inner.Get key with
| null -> ValueNone
| :? CacheEntry<'state> as existingEntry -> existingEntry.TryGetValue()
| x -> failwith $"tryLoad Incompatible cache entry %A{x}"
let addOrGet key options entry =
match inner.AddOrGetExisting(key, entry, CacheItemOptions.toPolicy options) with
| null -> Ok entry
| :? CacheEntry<'state> as existingEntry -> Error existingEntry
| x -> failwith $"addOrGet Incompatible cache entry %A{x}"
let getElseAddEmptyEntry key options =
match addOrGet key options (CacheEntry<'state>.CreateEmpty()) with
| Ok fresh -> fresh
| Error existingEntry -> existingEntry
let addOrMergeCacheEntry isStale key options timestamp struct (token, state) =
let entry = CacheEntry(token, state, timestamp)
match addOrGet key options entry with
| Ok _ -> () // Our fresh one got added
| Error existingEntry -> existingEntry.MergeUpdates(isStale, timestamp, token, state)
new (name, sizeMb: int) =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
Cache(new System.Runtime.Caching.MemoryCache(name, config))
interface ICache with
member _.TryGet key =
match inner.Get key with
| null -> ValueNone |> Task.FromResult
| :? CacheEntry<'state> as existingEntry -> ValueSome existingEntry.Value |> Task.FromResult
| x -> failwithf "TryGet Incompatible cache entry %A" x
member _.UpdateIfNewer(key, isStale, options, token, state) =
let freshEntry = CacheEntry(token, state)
match inner.AddOrGetExisting(key, freshEntry, CacheItemOptions.toPolicy options) with
| null -> Task.FromResult()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer(isStale, freshEntry); Task.FromResult()
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
// if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws)
member _.Load(key, maxAge, isStale, options, loadOrReload) = task {
let loadOrReload maybeBaseState () = task {
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
let! res = loadOrReload maybeBaseState
return struct (ts, res) }
if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it
let maybeBaseState = tryLoad key
let! timestamp, res = loadOrReload maybeBaseState ()
addOrMergeCacheEntry isStale key options timestamp res
return res
else // ensure we have an entry in the cache for this key; coordinate retrieval through that
let cacheSlot = getElseAddEmptyEntry key options
return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) }
// Newer values get saved; equal values update the last retrieval timestamp
member _.Save(key, isStale, options, timestamp, token, state) =
addOrMergeCacheEntry isStale key options timestamp (token, state)

type [<NoComparison; NoEquality; RequireQualifiedAccess>] CachingStrategy =
/// Retain a single 'state per streamName.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless a <c>LoadOption</c> is used, cache hits still incur a roundtrip to load any subsequently-added events.
| SlidingWindow of ICache * window: TimeSpan
/// Retain a single 'state per streamName.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless a <c>LoadOption</c> is used, cache hits still incur a roundtrip to load any subsequently-added events.
| FixedTimeSpan of ICache * period: TimeSpan
/// Prefix is used to segregate multiple folded states per stream when they are stored in the cache.
/// Semantics are otherwise identical to <c>SlidingWindow</c>.
Expand Down
26 changes: 8 additions & 18 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module Equinox.Core.Caching

open Equinox.Core.Tracing
open Serilog
open System
open System.Threading
Expand All @@ -12,32 +11,23 @@ type IReloadable<'state> =

let private tee f (inner: CancellationToken -> Task<struct (StreamToken * 'state)>) ct = task {
let! tokenAndState = inner ct
do! f tokenAndState
f tokenAndState
return tokenAndState }

type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state> >
(category: 'cat, cache: ICache, isStale, createKey, createOptions) =
let tryRead key = task {
let! cacheItem = cache.TryGet key
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome cacheItem) |> ignore
return cacheItem }
let save key (inner: CancellationToken -> Task<struct (StreamToken * 'state)>) ct = task {
let! struct (token, state) as res = inner ct
do! cache.UpdateIfNewer(key, isStale, createOptions (), token, state)
return res }
interface ICategory<'event, 'state, 'context> with
member _.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) = task {
let key = createKey streamName
match! tryRead key with
| ValueNone -> return! save key (fun ct -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)) ct
| ValueSome tokenAndState when maxAge = TimeSpan.MaxValue -> return tokenAndState // read already updated TTL, no need to write
| ValueSome (token, state) -> return! save key (fun ct -> category.Reload(log, streamName, requireLeader, token, state, ct)) ct }
let loadOrReload = function
| ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)
| ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct)
return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload) }
member _.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task {
let save struct (token, state) = cache.UpdateIfNewer(createKey streamName, isStale, createOptions (), token, state)
let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place
let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state)
match! category.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with
| SyncResult.Written tokenAndState' ->
do! save tokenAndState'
save tokenAndState'
return SyncResult.Written tokenAndState'
| SyncResult.Conflict resync ->
return SyncResult.Conflict (tee save resync) }
Expand Down
6 changes: 3 additions & 3 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,14 +1328,14 @@ type CachingStrategy =
/// Retain a single 'state per streamName, together with the associated etag.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
/// Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip
| SlidingWindow of ICache * window: TimeSpan
/// Retain a single 'state per streamName, together with the associated etag.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Typically combined with `Equinox.LoadOption.AllowStale` to minimize loads.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
/// Typically combined with an `Equinox.LoadOption` to minimize loads.
/// Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
| FixedTimeSpan of ICache * period: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
Expand Down
Loading

0 comments on commit b33b08f

Please sign in to comment.