Skip to content

Commit

Permalink
Add ResolveOption.AllowStale #157; Cache ES writes (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Oct 14, 2019
1 parent 6e309b1 commit 564b191
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 66 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- store-neutral `ICache`; centralized implementation in `Equinox.Core` [#161](https://github.com/jet/equinox/pull/161) :pray: [@DSilence](https://github.com/DSilence)
- `ResolveOption.AllowStale`, maximizing use of OCC for `Stream.Transact`, enabling stale reads (in the face of multiple writers) for `Stream.Query` [#167](https://github.com/jet/equinox/pull/167)

### Changed

Expand All @@ -20,6 +21,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Extracted `Equinox.Core` module [#164](https://github.com/jet/equinox/pull/164)
- Used `Transact` name consistently in `Accumulator` (follow-up to [#97](https://github.com/jet/equinox/pull/97)) [#166](https://github.com/jet/equinox/pull/166)
- Changed all curried Methods to tupled
- `.EventStore` now caches written values [#167](https://github.com/jet/equinox/pull/167)

### Removed
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ type Service(log, stream, ?maxAttempts) =

The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Equinox.fs`](src/Equinox/Equinox.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs).

`Read` above will do a roundtrip to the Store in order to fetch the most recent state (while this can be optimized by reading through the cache, each invocation will hit the store regardless). This Synchronous Read can be used to [Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency) to establish a state incorporating the effects of any Command invocation you know to have been completed.
`Read` above will do a roundtrip to the Store in order to fetch the most recent state (in `AllowStale` mode, the store roundtrip can be optimized out by reading through the cache). This Synchronous Read can be used to [Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency) to establish a state incorporating the effects of any Command invocation you know to have been completed.

`Execute` runs an Optimistic Concurrency Controlled `Transact` loop in order to effect the intent of the [write-only] Command. This involves:

Expand Down
14 changes: 8 additions & 6 deletions samples/Store/Backend/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ open Domain
open Domain.Cart

type Service(log, resolveStream) =
let (|AggregateId|) (id: CartId) = Equinox.AggregateId ("Cart", CartId.toStringN id)
let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, maxAttempts = 3)
let (|AggregateId|) (id: CartId, opt) = Equinox.AggregateId ("Cart", CartId.toStringN id), opt
let (|Stream|) (AggregateId (id,opt)) = Equinox.Stream(log, resolveStream (id,opt), maxAttempts = 3)

let flowAsync (Stream stream) (flow, prepare) =
stream.TransactAsync(fun state -> async {
Expand All @@ -17,13 +17,15 @@ type Service(log, resolveStream) =
let read (Stream stream) : Async<Folds.State> =
stream.Query id
let execute cartId command =
flowAsync cartId ((fun _ctx execute -> execute command), None)
flowAsync (cartId,None) ((fun _ctx execute -> execute command), None)

member __.FlowAsync(cartId, flow, ?prepare) =
flowAsync cartId (flow, prepare)
member __.FlowAsync(cartId, optimistic, flow, ?prepare) =
flowAsync (cartId,if optimistic then Some Equinox.AllowStale else None) (flow, prepare)

member __.Execute(cartId, command) =
execute cartId command

member __.Read cartId =
read cartId
read (cartId,None)
member __.ReadStale cartId =
read (cartId,Some Equinox.ResolveOption.AllowStale)
12 changes: 6 additions & 6 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ let snapshot = Domain.Cart.Folds.isOrigin, Domain.Cart.Folds.compact
let createMemoryStore () =
new VolatileStore ()
let createServiceMemory log store =
Backend.Cart.Service(log, Resolver(store, fold, initial).Resolve)
Backend.Cart.Service(log, Resolver(store, fold, initial).ResolveEx)

let codec = Domain.Cart.Events.codec

let resolveGesStreamWithRollingSnapshots gateway =
EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve
EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).ResolveEx
let resolveGesStreamWithoutCustomAccessStrategy gateway =
EventStore.Resolver(gateway, codec, fold, initial).Resolve
EventStore.Resolver(gateway, codec, fold, initial).ResolveEx

let resolveCosmosStreamWithSnapshotStrategy gateway =
Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve
Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).ResolveEx
let resolveCosmosStreamWithoutCustomAccessStrategy gateway =
Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching).Resolve
Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching).ResolveEx

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
service.FlowAsync(cartId, false, fun _ctx execute ->
for i in 1..count do
execute <| Domain.Cart.AddItem (context, skuId, i)
if i <> count then
Expand Down
6 changes: 3 additions & 3 deletions src/Equinox.Core/Stream.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
module Equinox.Core.Stream

/// Represents a specific stream in a ICategory
type private Stream<'event, 'state, 'streamId>(category : ICategory<'event, 'state, 'streamId>, streamId: 'streamId) =
type private Stream<'event, 'state, 'streamId>(category : ICategory<'event, 'state, 'streamId>, streamId: 'streamId, opt) =
interface IStream<'event, 'state> with
member __.Load log =
category.Load(log, streamId)
category.Load(log, streamId, opt)
member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
category.TrySync(log, token, originState, events)

let create (category : ICategory<'event, 'state, 'streamId>) streamId : IStream<'event, 'state> = Stream(category, streamId) :> _
let create (category : ICategory<'event, 'state, 'streamId>) opt streamId : IStream<'event, 'state> = Stream(category, streamId, opt) :> _

/// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip
type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) =
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.Core/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ open System.Diagnostics
/// Store-agnostic interface representing interactions an Application can have with a set of streams with a common event type
type ICategory<'event, 'state, 'streamId> =
/// Obtain the state from the target stream
abstract Load : log: ILogger * 'streamId -> Async<StreamToken * 'state>
abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async<StreamToken * 'state>
/// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding:
/// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
Expand Down
18 changes: 10 additions & 8 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ module Caching =
let! tokenAndState = load
return! intercept streamName tokenAndState }
interface ICategory<'event, 'state, Container*string> with
member __.Load(log, (container,streamName)) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, (container,streamName))) streamName
member __.Load(log, (container,streamName), opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, (container,streamName), opt)) streamName
member __.TrySync(log : ILogger, (Token.Unpack (_container,stream,_) as streamToken), state, events : 'event list)
: Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, streamToken, state, events)
Expand Down Expand Up @@ -913,12 +913,13 @@ type private Folder<'event, 'state>
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
let batched log containerStream = category.Load inspectUnfolds containerStream fold initial isOrigin log
interface ICategory<'event, 'state, Container*string> with
member __.Load(log, (container,streamName)): Async<StreamToken * 'state> =
member __.Load(log, (container,streamName), opt): Async<StreamToken * 'state> =
match readCache with
| None -> batched log (container,streamName)
| Some (cache : ICache, prefix : string) -> async {
match! cache.TryGet(prefix + streamName) with
| None -> return! batched log (container,streamName)
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some tokenAndState -> return! category.LoadFromToken tokenAndState fold isOrigin log }
member __.TrySync(log : ILogger, streamToken, state, events : 'event list)
: Async<SyncResult<'state>> = async {
Expand Down Expand Up @@ -1011,9 +1012,9 @@ type Resolver<'event, 'state>(context : Context, codec, fold, initial, caching,
| CachingStrategy.SlidingWindow(cache, window) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder

let resolveStream (streamId, maybeContainerInitializationGate) =
let resolveStream (streamId, maybeContainerInitializationGate) opt =
{ new IStream<'event, 'state> with
member __.Load log = category.Load(log, streamId)
member __.Load log = category.Load(log, streamId, opt)
member __.TrySync(log: ILogger, token: StreamToken, originState: 'state, events: 'event list) =
match maybeContainerInitializationGate with
| None -> category.TrySync(log, token, originState, events)
Expand All @@ -1026,13 +1027,14 @@ type Resolver<'event, 'state>(context : Context, codec, fold, initial, caching,

member __.Resolve(target, [<O; D null>]?option) =
match resolveTarget target, option with
| streamArgs,None -> resolveStream streamArgs
| streamArgs,(None|Some AllowStale) -> resolveStream streamArgs option
| (containerStream,maybeInit),Some AssumeEmpty ->
Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit))
Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit) option)
member __.ResolveEx(target,opt) = __.Resolve(target,?option=opt)

member __.FromMemento(Token.Unpack (container,stream,_pos) as streamToken,state) =
let skipInitialization = None
Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization))
Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization) None)

[<RequireQualifiedAccess; NoComparison>]
type Discovery =
Expand Down
17 changes: 10 additions & 7 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,15 @@ module Caching =
let! tokenAndState = load
return! intercept streamName tokenAndState }
interface ICategory<'event, 'state, string> with
member __.Load(log, streamName : string) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, streamName)) streamName
member __.Load(log, streamName : string, opt) : Async<StreamToken * 'state> =
loadAndIntercept (inner.Load(log, streamName, opt)) streamName
member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list) : Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync(log, token, state, events)
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name)
| SyncResult.Written (token',state') ->
return SyncResult.Written (token', state') }
let! intercepted = intercept stream.name (token', state')
return SyncResult.Written intercepted }

let applyCacheUpdatesWithSlidingExpiration
(cache: ICache)
Expand All @@ -474,12 +475,13 @@ module Caching =
type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) =
let batched log streamName = category.Load fold initial streamName log
interface ICategory<'event, 'state, string> with
member __.Load(log, streamName) : Async<StreamToken * 'state> =
member __.Load(log, streamName, opt) : Async<StreamToken * 'state> =
match readCache with
| None -> batched log streamName
| Some (cache : ICache, prefix : string) -> async {
match! cache.TryGet(prefix + streamName) with
| None -> return! batched log streamName
| Some tokenAndState when opt = Some AllowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }
member __.TrySync(log : ILogger, token, initialState, events : 'event list) : Async<SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, initialState) events
Expand Down Expand Up @@ -524,12 +526,13 @@ type Resolver<'event,'state>
let resolveTarget = function AggregateId (cat,streamId) -> sprintf "%s-%s" cat streamId | StreamName streamName -> streamName
member __.Resolve(target, [<O; D null>]?option) =
match resolveTarget target, option with
| sn,None -> resolveStream sn
| sn,Some AssumeEmpty -> Stream.ofMemento (context.LoadEmpty sn,initial) (resolveStream sn)
| sn,(None|Some AllowStale) -> resolveStream option sn
| sn,Some AssumeEmpty -> Stream.ofMemento (context.LoadEmpty sn,initial) (resolveStream option sn)
member __.ResolveEx(target,opt) = __.Resolve(target,?option=opt)

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack token as streamToken, state) =
Stream.ofMemento (streamToken,state) (resolveStream token.stream.name)
Stream.ofMemento (streamToken,state) (resolveStream None token.stream.name)

type private SerilogAdapter(log : ILogger) =
interface EventStore.ClientAPI.ILogger with
Expand Down
6 changes: 3 additions & 3 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ module private Token =
/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
type Category<'event, 'state>(store : VolatileStore, fold, initial) =
interface ICategory<'event, 'state, string> with
member __.Load(log, streamName) = async {
member __.Load(log, streamName, _opt) = async {
match store.TryLoad<'event> streamName log with
| None -> return Token.ofEmpty streamName initial
| Some events -> return Token.ofEventArray streamName fold initial events }
Expand All @@ -97,11 +97,11 @@ type Category<'event, 'state>(store : VolatileStore, fold, initial) =

type Resolver<'event, 'state>(store : VolatileStore, fold, initial) =
let category = Category<'event,'state>(store, fold, initial)
let resolveStream streamName = Stream.create category streamName
let resolveStream streamName = Stream.create category None streamName
let resolveTarget = function AggregateId (cat,streamId) -> sprintf "%s-%s" cat streamId | StreamName streamName -> streamName
member __.Resolve(target : Target, [<Optional; DefaultParameterValue null>] ?option) =
match resolveTarget target, option with
| sn,None -> resolveStream sn
| sn,(None|Some AllowStale) -> resolveStream sn
| sn,Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn)
member __.ResolveEx(target,opt) = __.Resolve(target,?option=opt)

Expand Down
6 changes: 5 additions & 1 deletion src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ type Target =
/// Store-agnostic <c>Context.Resolve</c> Options
type ResolveOption =
/// Without consulting Cache or any other source, assume the Stream to be empty for the initial Query or Transact
| AssumeEmpty
| AssumeEmpty
/// If the Cache holds a value, use that without checking the backing store for updates, implying:
/// - maximizing use of OCC for `Stream.Transact`
/// - enabling potentially stale reads [in the face of multiple writers)] (for `Stream.Query`)
| AllowStale
Loading

0 comments on commit 564b191

Please sign in to comment.