Skip to content

Commit

Permalink
MemoryStore: Fix Version for post-context in Equinox.Decider.Transact…
Browse files Browse the repository at this point in the history
…Ex (#296)
  • Loading branch information
bartelink authored Nov 10, 2021
1 parent 10ee545 commit 2e8d857
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 81 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed
### Removed
### Fixed

- `MemoryStore`: Fixed incorrect `Version` computation for `TransactEx` post-State [#296](https://github.com/jet/equinox/pull/296)

<a name="3.0.4"></a>
## [3.0.4] - 2021-10-15
Expand Down
9 changes: 9 additions & 0 deletions samples/Store/Domain.Tests/FavoritesTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ open System
let mkFavorite skuId = Favorited { date = DateTimeOffset.UtcNow; skuId = skuId }
let mkUnfavorite skuId = Unfavorited { skuId = skuId }

type Command =
| Favorite of date : DateTimeOffset * skuIds : SkuId list
| Unfavorite of skuId : SkuId

let interpret (command : Command) =
match command with
| Favorite (date, skus) -> decideFavorite date skus
| Unfavorite sku -> decideUnfavorite sku

/// Put the aggregate into the state where the command should trigger an event; verify correct events are yielded
let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =
let initialEvents = command |> function
Expand Down
49 changes: 23 additions & 26 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,41 @@ module Fold =
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let snapshot state = Events.Snapshotted { net = state }

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
| Unfavorite of skuId : SkuId

let interpret command (state : Fold.State) =
let doesntHave skuId = state |> Array.exists (fun x -> x.skuId = skuId) |> not
match command with
| Favorite (date = date; skuIds = skuIds) ->
[ for skuId in Seq.distinct skuIds do
if doesntHave skuId then
yield Events.Favorited { date = date; skuId = skuId } ]
| Unfavorite skuId ->
if doesntHave skuId then [] else
[ Events.Unfavorited { skuId = skuId } ]
let doesntHave skuId (state : Fold.State) = state |> Array.exists (fun x -> x.skuId = skuId) |> not

type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
let decideFavorite date skuIds state =
[ for skuId in Seq.distinct skuIds do
if state |> doesntHave skuId then
yield Events.Favorited { date = date; skuId = skuId } ]

let execute clientId command : Async<unit> =
let decider = resolve clientId
decider.Transact(interpret command)
let decideUnfavorite skuId state =
if state |> doesntHave skuId then [] else
[ Events.Unfavorited { skuId = skuId } ]

member __.Execute(clientId, command) =
execute clientId command
type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

member __.Favorite(clientId, skus) =
execute clientId (Command.Favorite(System.DateTimeOffset.Now, skus))
member _.Favorite(clientId, skus, ?at) =
let decider = resolve clientId
decider.Transact(decideFavorite (defaultArg at System.DateTimeOffset.Now) skus)

member __.Unfavorite(clientId, sku) =
execute clientId (Command.Unfavorite sku)
member _.Unfavorite(clientId, sku) =
let decider = resolve clientId
decider.Transact(decideUnfavorite sku)

member __.List clientId : Async<Events.Favorited []> =
member _.List clientId : Async<Events.Favorited []> =
let decider = resolve clientId
decider.Query(id)

member __.ListWithVersion clientId : Async<int64 * Events.Favorited []> =
member _.ListWithVersion clientId : Async<int64 * Events.Favorited []> =
let decider = resolve clientId
decider.QueryEx(fun ctx -> ctx.Version, ctx.State)

// NOTE not a real world example - used for an integration test; TODO get a better example where it's actually relevant
member _.UnfavoriteWithPostVersion(clientId, sku) =
let decider = resolve clientId
decider.TransactEx((fun c -> async { return (), decideUnfavorite sku c.State }),
fun _r c -> c.Version)

let create log resolveStream =
let resolve id = Equinox.Decider(log, resolveStream (streamName id), maxAttempts = 3)
Service(resolve)
14 changes: 11 additions & 3 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,26 @@ let createServiceCosmosUnoptimizedButCached log context =
let cat = CosmosStore.CosmosStoreCategory(context, codec, fold, initial, caching, access)
Favorites.create log cat.Resolve

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
| Unfavorite of skuId : SkuId

let execute (service : Favorites.Service) clientId = function
| Favorite (date, skus) -> service.Favorite(clientId, skus, at = date)
| Unfavorite sku -> service.Unfavorite(clientId, sku)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
let createLog () = createLogger testOutput

let act (service : Favorites.Service) (clientId, command) = async {
do! service.Execute(clientId, command)
do! execute service clientId command
let! version, items = service.ListWithVersion clientId

match command with
| Domain.Favorites.Favorite (_, skuIds) ->
| Favorite (_, skuIds) ->
test <@ skuIds |> List.forall (fun skuId -> items |> Array.exists (function { skuId = itemSkuId} -> itemSkuId = skuId)) @>
| Domain.Favorites.Unfavorite _ ->
| Unfavorite _ ->
test <@ Array.isEmpty items @>
return version, items }

Expand Down
76 changes: 26 additions & 50 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,6 @@ open Equinox
open Equinox.Core
open System.Runtime.InteropServices

/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
exception private WrongVersionException of value : obj

/// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary
[<NoEquality; NoComparison>]
type ConcurrentDictionarySyncResult<'t> = Written of 't | Conflict

/// Response type for VolatileStore.TrySync to communicate the outcome and updated state of a stream
[<NoEquality; NoComparison>]
type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't

/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
type VolatileStore<'Format>() =

Expand All @@ -35,27 +24,23 @@ type VolatileStore<'Format>() =
[<CLIEvent>]
/// Notifies of a batch of events being committed to a given Stream. Guarantees no out of order and/or overlapping raising of the event<br/>
/// NOTE in some cases, two or more overlapping commits can be coalesced into a single <c>Committed</c> event
member __.Committed : IEvent<FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[]> = committed.Publish
member _.Committed : IEvent<FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[]> = committed.Publish

/// Loads state from a given stream
member __.TryLoad streamName = match streams.TryGetValue streamName with false, _ -> None | true, packed -> Some packed
member _.TryLoad streamName = match streams.TryGetValue streamName with false, _ -> None | true, packed -> Some packed

/// Attempts a synchronization operation - yields conflicting value if sync function decides there is a conflict
member __.TrySync
( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult<FsCodec.ITimelineEvent<'Format>[]>,
events: FsCodec.ITimelineEvent<'Format>[])
: Async<ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]>> = async {
/// Attempts a synchronization operation - yields conflicting value if expectedCount does not match
member _.TrySync(streamName, expectedCount, events) : Async<bool * FsCodec.ITimelineEvent<'Format>[]> = async {
let seedStream _streamName = events
let updateValue _streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict -> raise <| WrongVersionException (box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue)
// we publish the event here, once, as `updateValue` can be invoked multiple times
if Array.length currentValue <> expectedCount then currentValue
else Array.append currentValue events
match streams.AddOrUpdate(streamName, seedStream, updateValue) with
| res when obj.ReferenceEquals(Array.last res, Array.last events) ->
// we publish the event here rather than inside updateValue, once, as that can be invoked multiple times
do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events))
return Written res
with WrongVersionException conflictingValue ->
return Conflict (unbox conflictingValue) }
return true, res
| res -> return false, res }

type Token = { streamName : string; eventCount : int }

Expand All @@ -67,47 +52,38 @@ module private Token =
version = int64 eventCount }
let (|Unpack|) (token: StreamToken) : Token = unbox<Token> token.value
/// Represent a stream known to be empty
let ofEmpty streamName initial = streamTokenOfEventCount streamName 0, initial
let tokenOfArray streamName (value: 'event array) = Array.length value |> streamTokenOfEventCount streamName
let tokenOfSeq streamName (value: 'event seq) = Seq.length value |> streamTokenOfEventCount streamName
/// Represent a known array of events (without a known folded State)
let ofEventArray streamName fold initial (events: 'event array) = tokenOfArray streamName events, fold initial (Seq.ofArray events)
/// Represent a known array of Events together with the associated state
let ofEventArrayAndKnownState streamName fold (state: 'state) (events: 'event seq) = tokenOfSeq streamName events, fold state events
let ofEmpty streamName = streamTokenOfEventCount streamName 0
let ofValue streamName (value : 'event array) = streamTokenOfEventCount streamName value.Length

/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
interface ICategory<'event, 'state, string, 'context> with
member __.Load(_log, streamName, _opt) = async {
member _.Load(_log, streamName, _opt) = async {
match store.TryLoad streamName with
| None -> return Token.ofEmpty streamName initial
| Some events -> return Token.ofEventArray streamName fold initial (events |> Array.choose codec.TryDecode) }
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
| None -> return Token.ofEmpty streamName, initial
| Some value -> return Token.ofValue streamName value, fold initial (value |> Seq.choose codec.TryDecode) }
member _.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
let encoded = events |> Seq.mapi (fun i e -> map (token.eventCount + i) (codec.Encode(context, e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.eventCount then ConcurrentDictionarySyncResult.Conflict
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
match! store.TrySync(token.streamName, trySyncValue, encoded) with
| ConcurrentArraySyncResult.Written _ ->
return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
match! store.TrySync(token.streamName, token.eventCount, encoded) with
| true, streamEvents' ->
return SyncResult.Written (Token.ofValue token.streamName streamEvents', fold state events)
| false, conflictingEvents ->
let resync = async {
let token' = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip token.eventCount |> List.ofSeq
return token', fold state (successorEvents |> Seq.choose codec.TryDecode) }
let token' = Token.ofValue token.streamName conflictingEvents
return token', fold state (conflictingEvents |> Seq.skip token.eventCount |> Seq.choose codec.TryDecode) }
return SyncResult.Conflict resync }

type MemoryStoreCategory<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context

member __.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
member _.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
match FsCodec.StreamName.toString streamName, option with
| sn, (None | Some AllowStale) -> resolveStream sn context
| sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context)
| sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn, initial) (resolveStream sn context)

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack stream as streamToken, state, [<Optional; DefaultParameterValue null>] ?context) =
member _.FromMemento(Token.Unpack stream as streamToken, state, [<Optional; DefaultParameterValue null>] ?context) =
Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context)
15 changes: 13 additions & 2 deletions tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,16 @@ type ChangeFeed(testOutputHelper) =
stream = expectedStream
&& env.Index = 1L
&& env.EventType = "Unfavorited"
&& env.Data |> unbox<Domain.Favorites.Events.Unfavorited> |> fun x -> x.skuId = sku @>
}
&& env.Data |> unbox<Domain.Favorites.Events.Unfavorited> |> fun x -> x.skuId = sku @> }

type Versions(testOutputHelper) =
let log = TestOutputAdapter testOutputHelper |> createLogger

[<AutoData>]
let ``Post-Version is computed correctly`` (clientId, sku) = Async.RunSynchronously <| async {
let store = createMemoryStore ()
let service = createFavoritesServiceMemory log store

do! service.Favorite(clientId, [sku])
let! postVersion = service.UnfavoriteWithPostVersion(clientId, sku)
test <@ 1L + 1L = postVersion @> }

0 comments on commit 2e8d857

Please sign in to comment.