From 564b191752be7c80d8e35aea52fc67173b564566 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 14 Oct 2019 02:22:58 +0100 Subject: [PATCH] Add ResolveOption.AllowStale #157; Cache ES writes (#167) --- CHANGELOG.md | 2 + DOCUMENTATION.md | 2 +- samples/Store/Backend/Cart.fs | 14 ++-- samples/Store/Integration/CartIntegration.fs | 12 ++-- src/Equinox.Core/Stream.fs | 6 +- src/Equinox.Core/Types.fs | 2 +- src/Equinox.Cosmos/Cosmos.fs | 18 ++--- src/Equinox.EventStore/EventStore.fs | 17 +++-- src/Equinox.MemoryStore/MemoryStore.fs | 6 +- src/Equinox/Equinox.fs | 6 +- .../CosmosIntegration.fs | 32 ++++++--- .../EventStoreIntegration.fs | 67 ++++++++++++++----- .../MemoryStoreIntegration.fs | 6 +- 13 files changed, 124 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 978755cf5..13be86a78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - store-neutral `ICache`; centralized implementation in `Equinox.Core` [#161](https://github.com/jet/equinox/pull/161) :pray: [@DSilence](https://github.com/DSilence) +- `ResolveOption.AllowStale`, maximizing use of OCC for `Stream.Transact`, enabling stale reads (in the face of multiple writers) for `Stream.Query` [#167](https://github.com/jet/equinox/pull/167) ### Changed @@ -20,6 +21,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - Extracted `Equinox.Core` module [#164](https://github.com/jet/equinox/pull/164) - Used `Transact` name consistently in `Accumulator` (follow-up to [#97](https://github.com/jet/equinox/pull/97)) [#166](https://github.com/jet/equinox/pull/166) - Changed all curried Methods to tupled +- `.EventStore` now caches written values [#167](https://github.com/jet/equinox/pull/167) ### Removed ### Fixed diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 501c73166..e0620f612 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -344,7 +344,7 @@ type Service(log, stream, ?maxAttempts) = The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Equinox.fs`](src/Equinox/Equinox.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs). -`Read` above will do a roundtrip to the Store in order to fetch the most recent state (while this can be optimized by reading through the cache, each invocation will hit the store regardless). This Synchronous Read can be used to [Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency) to establish a state incorporating the effects of any Command invocation you know to have been completed. +`Read` above will do a roundtrip to the Store in order to fetch the most recent state (in `AllowStale` mode, the store roundtrip can be optimized out by reading through the cache). This Synchronous Read can be used to [Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency) to establish a state incorporating the effects of any Command invocation you know to have been completed. `Execute` runs an Optimistic Concurrency Controlled `Transact` loop in order to effect the intent of the [write-only] Command. This involves: diff --git a/samples/Store/Backend/Cart.fs b/samples/Store/Backend/Cart.fs index 9bf34b80b..f32ff2a90 100644 --- a/samples/Store/Backend/Cart.fs +++ b/samples/Store/Backend/Cart.fs @@ -4,8 +4,8 @@ open Domain open Domain.Cart type Service(log, resolveStream) = - let (|AggregateId|) (id: CartId) = Equinox.AggregateId ("Cart", CartId.toStringN id) - let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, maxAttempts = 3) + let (|AggregateId|) (id: CartId, opt) = Equinox.AggregateId ("Cart", CartId.toStringN id), opt + let (|Stream|) (AggregateId (id,opt)) = Equinox.Stream(log, resolveStream (id,opt), maxAttempts = 3) let flowAsync (Stream stream) (flow, prepare) = stream.TransactAsync(fun state -> async { @@ -17,13 +17,15 @@ type Service(log, resolveStream) = let read (Stream stream) : Async = stream.Query id let execute cartId command = - flowAsync cartId ((fun _ctx execute -> execute command), None) + flowAsync (cartId,None) ((fun _ctx execute -> execute command), None) - member __.FlowAsync(cartId, flow, ?prepare) = - flowAsync cartId (flow, prepare) + member __.FlowAsync(cartId, optimistic, flow, ?prepare) = + flowAsync (cartId,if optimistic then Some Equinox.AllowStale else None) (flow, prepare) member __.Execute(cartId, command) = execute cartId command member __.Read cartId = - read cartId \ No newline at end of file + read (cartId,None) + member __.ReadStale cartId = + read (cartId,Some Equinox.ResolveOption.AllowStale) \ No newline at end of file diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 57a6061a2..a7e1aec53 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -14,22 +14,22 @@ let snapshot = Domain.Cart.Folds.isOrigin, Domain.Cart.Folds.compact let createMemoryStore () = new VolatileStore () let createServiceMemory log store = - Backend.Cart.Service(log, Resolver(store, fold, initial).Resolve) + Backend.Cart.Service(log, Resolver(store, fold, initial).ResolveEx) let codec = Domain.Cart.Events.codec let resolveGesStreamWithRollingSnapshots gateway = - EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve + EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).ResolveEx let resolveGesStreamWithoutCustomAccessStrategy gateway = - EventStore.Resolver(gateway, codec, fold, initial).Resolve + EventStore.Resolver(gateway, codec, fold, initial).ResolveEx let resolveCosmosStreamWithSnapshotStrategy gateway = - Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve + Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).ResolveEx let resolveCosmosStreamWithoutCustomAccessStrategy gateway = - Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching).Resolve + Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching).ResolveEx let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = - service.FlowAsync(cartId, fun _ctx execute -> + service.FlowAsync(cartId, false, fun _ctx execute -> for i in 1..count do execute <| Domain.Cart.AddItem (context, skuId, i) if i <> count then diff --git a/src/Equinox.Core/Stream.fs b/src/Equinox.Core/Stream.fs index 01be05597..89606d589 100644 --- a/src/Equinox.Core/Stream.fs +++ b/src/Equinox.Core/Stream.fs @@ -2,14 +2,14 @@ module Equinox.Core.Stream /// Represents a specific stream in a ICategory -type private Stream<'event, 'state, 'streamId>(category : ICategory<'event, 'state, 'streamId>, streamId: 'streamId) = +type private Stream<'event, 'state, 'streamId>(category : ICategory<'event, 'state, 'streamId>, streamId: 'streamId, opt) = interface IStream<'event, 'state> with member __.Load log = - category.Load(log, streamId) + category.Load(log, streamId, opt) member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) = category.TrySync(log, token, originState, events) -let create (category : ICategory<'event, 'state, 'streamId>) streamId : IStream<'event, 'state> = Stream(category, streamId) :> _ +let create (category : ICategory<'event, 'state, 'streamId>) opt streamId : IStream<'event, 'state> = Stream(category, streamId, opt) :> _ /// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) = diff --git a/src/Equinox.Core/Types.fs b/src/Equinox.Core/Types.fs index 920afb898..388ca41e1 100644 --- a/src/Equinox.Core/Types.fs +++ b/src/Equinox.Core/Types.fs @@ -8,7 +8,7 @@ open System.Diagnostics /// Store-agnostic interface representing interactions an Application can have with a set of streams with a common event type type ICategory<'event, 'state, 'streamId> = /// Obtain the state from the target stream - abstract Load : log: ILogger * 'streamId -> Async + abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async /// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding: /// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream /// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index aafd32678..dc9bca4f4 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -883,8 +883,8 @@ module Caching = let! tokenAndState = load return! intercept streamName tokenAndState } interface ICategory<'event, 'state, Container*string> with - member __.Load(log, (container,streamName)) : Async = - loadAndIntercept (inner.Load(log, (container,streamName))) streamName + member __.Load(log, (container,streamName), opt) : Async = + loadAndIntercept (inner.Load(log, (container,streamName), opt)) streamName member __.TrySync(log : ILogger, (Token.Unpack (_container,stream,_) as streamToken), state, events : 'event list) : Async> = async { let! syncRes = inner.TrySync(log, streamToken, state, events) @@ -913,12 +913,13 @@ type private Folder<'event, 'state> let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true let batched log containerStream = category.Load inspectUnfolds containerStream fold initial isOrigin log interface ICategory<'event, 'state, Container*string> with - member __.Load(log, (container,streamName)): Async = + member __.Load(log, (container,streamName), opt): Async = match readCache with | None -> batched log (container,streamName) | Some (cache : ICache, prefix : string) -> async { match! cache.TryGet(prefix + streamName) with | None -> return! batched log (container,streamName) + | Some tokenAndState when opt = Some AllowStale -> return tokenAndState | Some tokenAndState -> return! category.LoadFromToken tokenAndState fold isOrigin log } member __.TrySync(log : ILogger, streamToken, state, events : 'event list) : Async> = async { @@ -1011,9 +1012,9 @@ type Resolver<'event, 'state>(context : Context, codec, fold, initial, caching, | CachingStrategy.SlidingWindow(cache, window) -> Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder - let resolveStream (streamId, maybeContainerInitializationGate) = + let resolveStream (streamId, maybeContainerInitializationGate) opt = { new IStream<'event, 'state> with - member __.Load log = category.Load(log, streamId) + member __.Load log = category.Load(log, streamId, opt) member __.TrySync(log: ILogger, token: StreamToken, originState: 'state, events: 'event list) = match maybeContainerInitializationGate with | None -> category.TrySync(log, token, originState, events) @@ -1026,13 +1027,14 @@ type Resolver<'event, 'state>(context : Context, codec, fold, initial, caching, member __.Resolve(target, []?option) = match resolveTarget target, option with - | streamArgs,None -> resolveStream streamArgs + | streamArgs,(None|Some AllowStale) -> resolveStream streamArgs option | (containerStream,maybeInit),Some AssumeEmpty -> - Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit)) + Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit) option) + member __.ResolveEx(target,opt) = __.Resolve(target,?option=opt) member __.FromMemento(Token.Unpack (container,stream,_pos) as streamToken,state) = let skipInitialization = None - Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization)) + Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization) None) [] type Discovery = diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index d61b064e1..f032f8a56 100644 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -451,14 +451,15 @@ module Caching = let! tokenAndState = load return! intercept streamName tokenAndState } interface ICategory<'event, 'state, string> with - member __.Load(log, streamName : string) : Async = - loadAndIntercept (inner.Load(log, streamName)) streamName + member __.Load(log, streamName : string, opt) : Async = + loadAndIntercept (inner.Load(log, streamName, opt)) streamName member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list) : Async> = async { let! syncRes = inner.TrySync(log, token, state, events) match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name) | SyncResult.Written (token',state') -> - return SyncResult.Written (token', state') } + let! intercepted = intercept stream.name (token', state') + return SyncResult.Written intercepted } let applyCacheUpdatesWithSlidingExpiration (cache: ICache) @@ -474,12 +475,13 @@ module Caching = type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) = let batched log streamName = category.Load fold initial streamName log interface ICategory<'event, 'state, string> with - member __.Load(log, streamName) : Async = + member __.Load(log, streamName, opt) : Async = match readCache with | None -> batched log streamName | Some (cache : ICache, prefix : string) -> async { match! cache.TryGet(prefix + streamName) with | None -> return! batched log streamName + | Some tokenAndState when opt = Some AllowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } member __.TrySync(log : ILogger, token, initialState, events : 'event list) : Async> = async { let! syncRes = category.TrySync fold log (token, initialState) events @@ -524,12 +526,13 @@ type Resolver<'event,'state> let resolveTarget = function AggregateId (cat,streamId) -> sprintf "%s-%s" cat streamId | StreamName streamName -> streamName member __.Resolve(target, []?option) = match resolveTarget target, option with - | sn,None -> resolveStream sn - | sn,Some AssumeEmpty -> Stream.ofMemento (context.LoadEmpty sn,initial) (resolveStream sn) + | sn,(None|Some AllowStale) -> resolveStream option sn + | sn,Some AssumeEmpty -> Stream.ofMemento (context.LoadEmpty sn,initial) (resolveStream option sn) + member __.ResolveEx(target,opt) = __.Resolve(target,?option=opt) /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] member __.FromMemento(Token.Unpack token as streamToken, state) = - Stream.ofMemento (streamToken,state) (resolveStream token.stream.name) + Stream.ofMemento (streamToken,state) (resolveStream None token.stream.name) type private SerilogAdapter(log : ILogger) = interface EventStore.ClientAPI.ILogger with diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index aacd69fe2..49f61253c 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -78,7 +78,7 @@ module private Token = /// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!). type Category<'event, 'state>(store : VolatileStore, fold, initial) = interface ICategory<'event, 'state, string> with - member __.Load(log, streamName) = async { + member __.Load(log, streamName, _opt) = async { match store.TryLoad<'event> streamName log with | None -> return Token.ofEmpty streamName initial | Some events -> return Token.ofEventArray streamName fold initial events } @@ -97,11 +97,11 @@ type Category<'event, 'state>(store : VolatileStore, fold, initial) = type Resolver<'event, 'state>(store : VolatileStore, fold, initial) = let category = Category<'event,'state>(store, fold, initial) - let resolveStream streamName = Stream.create category streamName + let resolveStream streamName = Stream.create category None streamName let resolveTarget = function AggregateId (cat,streamId) -> sprintf "%s-%s" cat streamId | StreamName streamName -> streamName member __.Resolve(target : Target, [] ?option) = match resolveTarget target, option with - | sn,None -> resolveStream sn + | sn,(None|Some AllowStale) -> resolveStream sn | sn,Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn) member __.ResolveEx(target,opt) = __.Resolve(target,?option=opt) diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index 45422e617..7f3698555 100644 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -48,4 +48,8 @@ type Target = /// Store-agnostic Context.Resolve Options type ResolveOption = /// Without consulting Cache or any other source, assume the Stream to be empty for the initial Query or Transact - | AssumeEmpty \ No newline at end of file + | AssumeEmpty + /// If the Cache holds a value, use that without checking the backing store for updates, implying: + /// - maximizing use of OCC for `Stream.Transact` + /// - enabling potentially stale reads [in the face of multiple writers)] (for `Stream.Query`) + | AllowStale \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 438bfb983..4385cbe85 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -14,28 +14,28 @@ module Cart = let codec = Domain.Cart.Events.codec let createServiceWithoutOptimization connection batchSize log = let store = createCosmosContext connection batchSize - let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching).Resolve + let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching).ResolveEx Backend.Cart.Service(log, resolveStream) let projection = "Compacted",snd snapshot /// Trigger looking in Tip (we want those calls to occur, but without leaning on snapshots, which would reduce the paths covered) let createServiceWithEmptyUnfolds connection batchSize log = let store = createCosmosContext connection batchSize let unfArgs = Domain.Cart.Folds.isOrigin, fun _ -> Seq.empty - let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unfolded unfArgs).Resolve + let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unfolded unfArgs).ResolveEx Backend.Cart.Service(log, resolveStream) let createServiceWithSnapshotStrategy connection batchSize log = let store = createCosmosContext connection batchSize - let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve + let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).ResolveEx Backend.Cart.Service(log, resolveStream) let createServiceWithSnapshotStrategyAndCaching connection batchSize log cache = let store = createCosmosContext connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream = Resolver(store, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).Resolve + let resolveStream = Resolver(store, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).ResolveEx Backend.Cart.Service(log, resolveStream) let createServiceWithRollingUnfolds connection log = let store = createCosmosContext connection 1 let access = AccessStrategy.RollingUnfolds(Domain.Cart.Folds.isOrigin,Domain.Cart.Folds.transmute) - let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, access).Resolve + let resolveStream = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, access).ResolveEx Backend.Cart.Service(log, resolveStream) module ContactPreferences = @@ -59,16 +59,18 @@ type Tests(testOutputHelper) = inherit TestsWithLogCapture(testOutputHelper) let log,capture = base.Log, base.Capture - let addAndThenRemoveItems exceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = - service.FlowAsync(cartId, fun _ctx execute -> + let addAndThenRemoveItems optimistic exceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = + service.FlowAsync(cartId, optimistic, fun _ctx execute -> for i in 1..count do execute <| Domain.Cart.AddItem (context, skuId, i) if not exceptTheLastOne || i <> count then execute <| Domain.Cart.RemoveItem (context, skuId) ) let addAndThenRemoveItemsManyTimes context cartId skuId service count = - addAndThenRemoveItems false context cartId skuId service count + addAndThenRemoveItems false false context cartId skuId service count let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service count = - addAndThenRemoveItems true context cartId skuId service count + addAndThenRemoveItems false true context cartId skuId service count + let addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service count = + addAndThenRemoveItems true true context cartId skuId service count let verifyRequestChargesMax rus = let tripRequestCharges = [ for e, c in capture.RequestCharges -> sprintf "%A" e, c ] @@ -128,7 +130,7 @@ type Tests(testOutputHelper) = return Some (skuId, addRemoveCount) } let act prepare (service : Backend.Cart.Service) skuId count = - service.FlowAsync(cartId, prepare = prepare, flow = fun _ctx execute -> + service.FlowAsync(cartId, false, prepare = prepare, flow = fun _ctx execute -> execute <| Domain.Cart.AddItem (context, skuId, count)) let eventWaitSet () = let e = new ManualResetEvent(false) in (Async.AwaitWaitHandle e |> Async.Ignore), async { e.Set() |> ignore } @@ -258,7 +260,7 @@ type Tests(testOutputHelper) = return Some (skuId, addRemoveCount) } let act prepare (service : Backend.Cart.Service) skuId count = - service.FlowAsync(cartId, prepare = prepare, flow = fun _ctx execute -> + service.FlowAsync(cartId, false, prepare = prepare, flow = fun _ctx execute -> execute <| Domain.Cart.AddItem (context, skuId, count)) let eventWaitSet () = let e = new ManualResetEvent(false) in (Async.AwaitWaitHandle e |> Async.Ignore), async { e.Set() |> ignore } @@ -364,8 +366,16 @@ type Tests(testOutputHelper) = // While we now have 12 events, we should be able to read them with a single call capture.Clear() + let! _ = service2.ReadStale cartId + // A Stale read doesn't roundtrip + test <@ [] = capture.ExternalCalls @> let! _ = service2.Read cartId let! _ = service2.Read cartId // First is cached because writer emits etag, second remains cached test <@ [EqxAct.TipNotModified; EqxAct.TipNotModified] = capture.ExternalCalls @> + + // Optimistic write mode saves the TipNotModified + capture.Clear() + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1 + test <@ [EqxAct.Append] = capture.ExternalCalls @> } \ No newline at end of file diff --git a/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs b/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs index 24bc68e88..06a0c40a8 100644 --- a/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs @@ -25,16 +25,16 @@ module Cart = let codec = Domain.Cart.Events.codec let snapshot = Domain.Cart.Folds.isOrigin, Domain.Cart.Folds.compact let createServiceWithoutOptimization log gateway = - Backend.Cart.Service(log, Resolver(gateway, Domain.Cart.Events.codec, fold, initial).Resolve) + Backend.Cart.Service(log, Resolver(gateway, Domain.Cart.Events.codec, fold, initial).ResolveEx) let createServiceWithCompaction log gateway = - let resolveStream = Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve + let resolveStream = Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).ResolveEx Backend.Cart.Service(log, resolveStream) let createServiceWithCaching log gateway cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Backend.Cart.Service(log, Resolver(gateway, codec, fold, initial, sliding20m).Resolve) + Backend.Cart.Service(log, Resolver(gateway, codec, fold, initial, sliding20m).ResolveEx) let createServiceWithCompactionAndCaching log gateway cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Backend.Cart.Service(log, Resolver(gateway, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).Resolve) + Backend.Cart.Service(log, Resolver(gateway, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).ResolveEx) module ContactPreferences = let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial @@ -51,16 +51,18 @@ module ContactPreferences = type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper - let addAndThenRemoveItems exceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = - service.FlowAsync(cartId, fun _ctx execute -> + let addAndThenRemoveItems optimistic exceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = + service.FlowAsync(cartId, optimistic, fun _ctx execute -> for i in 1..count do execute <| Domain.Cart.AddItem (context, skuId, i) if not exceptTheLastOne || i <> count then execute <| Domain.Cart.RemoveItem (context, skuId) ) let addAndThenRemoveItemsManyTimes context cartId skuId service count = - addAndThenRemoveItems false context cartId skuId service count + addAndThenRemoveItems false false context cartId skuId service count let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service count = - addAndThenRemoveItems true context cartId skuId service count + addAndThenRemoveItems false true context cartId skuId service count + let addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service count = + addAndThenRemoveItems true true context cartId skuId service count let createLoggerWithCapture () = let capture = LogCaptureBuffer() @@ -125,7 +127,7 @@ type Tests(testOutputHelper) = return Some (skuId, addRemoveCount) } let act prepare (service : Backend.Cart.Service) log skuId count = - service.FlowAsync(cartId, prepare = prepare, flow = fun _ctx execute -> + service.FlowAsync(cartId, false, prepare = prepare, flow = fun _ctx execute -> execute <| Domain.Cart.AddItem (context, skuId, count)) let eventWaitSet () = let e = new ManualResetEvent(false) in (Async.AwaitWaitHandle e |> Async.Ignore), async { e.Set() |> ignore } @@ -252,26 +254,59 @@ type Tests(testOutputHelper) = let cache = Equinox.Cache("cart", sizeMb = 50) let gateway = createGesGateway conn batchSize let createServiceCached () = Cart.createServiceWithCaching log gateway cache - let service1, service2 = createServiceCached (), createServiceCached () + let service1, service2, service3 = createServiceCached (), createServiceCached (), Cart.createServiceWithoutOptimization log gateway let cartId = % Guid.NewGuid() // Trigger 10 events, then reload - do! addAndThenRemoveItemsManyTimes context cartId skuId service1 5 - let! _ = service2.Read cartId - - // ... should see a single read as we are writes are cached + do! addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service1 5 + test <@ batchForwardAndAppend = capture.ExternalCalls @> + let! resStale = service2.ReadStale cartId + test <@ batchForwardAndAppend = capture.ExternalCalls @> + let! resFresh = service2.Read cartId + // Because we're caching writes, stale vs fresh reads are equivalent + test <@ resStale = resFresh @> + // ... should see a write plus a batched forward read as position is cached test <@ batchForwardAndAppend @ singleBatchForward = capture.ExternalCalls @> // Add two more - the roundtrip should only incur a single read capture.Clear() let skuId2 = SkuId <| Guid.NewGuid() - do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1 + do! addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId2 service1 1 test <@ batchForwardAndAppend = capture.ExternalCalls @> // While we now have 12 events, we should be able to read them with a single call capture.Clear() - let! _ = service2.Read cartId + // Do a stale read - we will see outs + let! res = service2.ReadStale cartId + // result after 10 should be different to result after 12 + test <@ res <> resFresh @> + // but we don't do a roundtrip to get it + test <@ [] = capture.ExternalCalls @> + let! resDefault = service2.Read cartId test <@ singleBatchForward = capture.ExternalCalls @> + + // Optimistic transactions + capture.Clear() + // As the cache is up to date, we can transact against the cached value and do a null transaction without a roundtrip + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId2 service1 1 + test <@ [] = capture.ExternalCalls @> + // As the cache is up to date, we can do an optimistic append, saving a Read roundtrip + let skuId3 = SkuId <| Guid.NewGuid() + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId3 service1 1 + // this time, we did something, so we see the append call + test <@ [EsAct.Append] = capture.ExternalCalls @> + + // If we don't have a cache attached, we don't benefit from / pay the price for any optimism + capture.Clear() + let skuId4 = SkuId <| Guid.NewGuid() + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId4 service3 1 + // Need 2 batches to do the reading + test <@ [EsAct.SliceForward] @ singleBatchForward @ [EsAct.Append] = capture.ExternalCalls @> + // we've engineered a clash with the cache state (service3 doest participate in caching) + // Conflict with cached state leads to a read forward to resync; Then we'll idempotently decide not to do any append + capture.Clear() + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId4 service2 1 + test <@ [EsAct.AppendConflict; EsAct.SliceForward; EsAct.BatchForward] = capture.ExternalCalls @> } [] diff --git a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs index 2bdab67bd..eee552dda 100644 --- a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs +++ b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs @@ -7,7 +7,7 @@ let createMemoryStore () = new VolatileStore() let createServiceMemory log store = - Backend.Cart.Service(log, Resolver(store, Domain.Cart.Folds.fold, Domain.Cart.Folds.initial).Resolve) + Backend.Cart.Service(log, Resolver(store, Domain.Cart.Folds.fold, Domain.Cart.Folds.initial).ResolveEx) #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -26,10 +26,10 @@ type Tests(testOutputHelper) = // Act: Run the decision twice... let actTrappingStateAsSaved cartId = - service.FlowAsync(cartId, flow) + service.FlowAsync(cartId, false, flow) let actLoadingStateSeparately cartId = async { - let! _ = service.FlowAsync(cartId, flow) + let! _ = service.FlowAsync(cartId, false, flow) return! service.Read cartId } let! expected = cartId1 |> actTrappingStateAsSaved let! actual = cartId2 |> actLoadingStateSeparately