Skip to content

Commit

Permalink
Remove Store namespace; Files reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 22, 2019
1 parent 82494bb commit 453672d
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 117 deletions.
14 changes: 7 additions & 7 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,14 @@ while these are not omnipresent, for the purposes of this discussion we’ll tre

### Flows, Streams and Accumulators

Equinox’s Command Handling consists of < 250 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch are in a normal application are:
Equinox’s Command Handling consists of < 200 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch in a normal application are:

- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs#L36) - internal implementation of Optimistic Concurrency Control / retry loop used by `Stream`. It's recommended to at least scan this file as it defines the Transaction semantics everything is coming together in service of.
- [`type Stream`](https://github.com/jet/equinox/blob/master/src/Equinox/Stream.fs#L11) - surface API one uses to `Transact` or `Query` against a specific stream
- [`type Target` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Stream.fs#L39) - used to identify the Stream pertaining to the relevant Aggregate that `resolveStream` will use to hydrate a `Stream`
- _[`type Accumulator`](https://github.com/jet/equinox/blob/master/src/Equinox/Accumulator.fs) - optional `type` that can be used to manage application-local State in some flavors of Service__
- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs#L34) - internal implementation of Optimistic Concurrency Control / retry loop used by `Stream`. It's recommended to at least scan this file as it defines the Transaction semantics everything is coming together in service of.
- [`type Stream`](https://github.com/jet/equinox/blob/master/src/Equinox/Equinox.fs#L11) - surface API one uses to `Transact` or `Query` against a specific stream
- [`type Target` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Equinox.fs#L42) - used to identify the Stream pertaining to the relevant Aggregate that `resolveStream` will use to hydrate a `Stream`
- _[`type Accumulator`](https://github.com/jet/equinox/blob/master/src/Equinox/Accumulator.fs) - optional `type` that can be used to manage application-local State in some complex flavors of Service__

Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the few hundred lines can tell many of the thousands of words about to follow!
Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the 3 files can tell many of the thousands of words about to follow!

#### Stream Members

Expand Down Expand Up @@ -342,7 +342,7 @@ type Service(log, stream, ?maxAttempts) =
inner.Query id
```

The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Stream.fs`](src/Equinox/Stream.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs).
The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Equinox.fs`](src/Equinox/Equinox.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs).

`Read` above will do a roundtrip to the Store in order to fetch the most recent state (while this can be optimized by reading through the cache, each invocation will hit the store regardless). This Synchronous Read can be used to [Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency) to establish a state incorporating the effects of any Command invocation you know to have been completed.

Expand Down
2 changes: 0 additions & 2 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace Equinox.Cosmos.Store

open Equinox.Storage
open Equinox.Store
open FsCodec
open Microsoft.Azure.Documents
open Newtonsoft.Json
Expand Down Expand Up @@ -771,7 +770,6 @@ namespace Equinox.Cosmos
open Equinox
open Equinox.Cosmos.Store
open Equinox.Storage
open Equinox.Store
open FsCodec
open FSharp.Control
open Microsoft.Azure.Documents
Expand Down
1 change: 0 additions & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

open Equinox
open Equinox.Storage
open Equinox.Store
open EventStore.ClientAPI
open Serilog // NB must shadow EventStore.ClientAPI.ILogger
open System
Expand Down
1 change: 0 additions & 1 deletion src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Equinox.MemoryStore

open Equinox
open Equinox.Storage
open Equinox.Store
open Serilog
open System.Runtime.InteropServices

Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.Storage/Equinox.Storage.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Types.fs" />
<Compile Include="StorageStream.fs" />
<Compile Include="Stream.fs" />
<Compile Include="Retry.fs" />
<Compile Include="AsyncCacheCell.fs" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/// Low level stream builders, generally consumed via Store-specific Stream Builders that layer policies such as Caching in at the Category level
module Equinox.Storage.Stream

open Equinox.Store

/// Represents a specific stream in a ICategory
type private Stream<'event, 'state, 'streamId>(category : ICategory<'event, 'state, 'streamId>, streamId: 'streamId) =
interface IStream<'event, 'state> with
Expand Down
1 change: 0 additions & 1 deletion src/Equinox.Storage/Types.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Equinox.Storage

open Equinox.Store
open Serilog
open System
open System.Diagnostics
Expand Down
5 changes: 3 additions & 2 deletions src/Equinox/Stream.fs → src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Equinox

open Equinox.Storage
open System.Runtime.InteropServices

// Exception yielded by Stream.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store
Expand All @@ -8,7 +9,7 @@ type MaxResyncsExhaustedException(count) =

/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Stream<'event, 'state>
( log, stream : Store.IStream<'event, 'state>, maxAttempts : int,
( log, stream : Storage.IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>]?mkAttemptsExhaustedException,
[<Optional; DefaultParameterValue(null)>]?resyncPolicy) =
let transact f =
Expand All @@ -34,7 +35,7 @@ type Stream<'event, 'state>

/// Low-level helper to allow one to obtain a reference to a stream and state pair (including the position) in order to pass it as a continuation within the application
/// Such a memento is then held within the application and passed in lieu of a StreamId to the StreamResolver in order to avoid having to reload state
member __.CreateMemento(): Async<Store.StreamToken * 'state> = Flow.query(stream, log, fun syncState -> syncState.Memento)
member __.CreateMemento(): Async<StreamToken * 'state> = Flow.query(stream, log, fun syncState -> syncState.Memento)

/// Store-agnostic way to specify a target Stream to a Resolver
[<NoComparison; NoEquality>]
Expand Down
5 changes: 2 additions & 3 deletions src/Equinox/Equinox.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Interfaces.fs" />
<Compile Include="Flow.fs" />
<Compile Include="Stream.fs" />
<Compile Include="Equinox.fs" />
<Compile Include="Accumulator.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-18618-05" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-18618-05" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="2.0.0-alpha.2" PrivateAssets="All" />

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
Expand Down
157 changes: 93 additions & 64 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
@@ -1,75 +1,104 @@
/// Internal implementation of the Store agnostic load + run/render. See Stream.fs for App-facing APIs.
module internal Equinox.Flow
/// Internal data structures/impl. While these are intended to be legible, understanding the abstractions involved is only necessary if you are implementing a Store or a decorator thereof.
/// i.e., if you're seeking to understand the main usage flows of the Equinox library, that's in Equinox.fs, not here
namespace Equinox.Storage

open Equinox.Store
open Serilog

/// Represents stream and folding state between the load and run/render phases
type SyncState<'event, 'state>
( originState : StreamToken * 'state,
trySync : ILogger -> StreamToken * 'state -> 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState
/// Store-specific opaque token to be used for synchronization purposes
[<NoComparison>]
type StreamToken = { value : obj; version: int64 }

member __.Memento = tokenAndState
member __.State = snd __.Memento
member __.Version = (fst __.Memento).version
/// Internal type used to represent the outcome of a TrySync operation
[<NoEquality; NoComparison; RequireQualifiedAccess>]
type SyncResult<'state> =
/// The write succeeded (the supplied token and state can be used to efficiently continue the processing iff desired)
| Written of StreamToken * 'state
/// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store
/// The inner is Async as some stores (and/or states) are such that determining the conflicting state (iff required) needs an extra trip to obtain
| Conflict of Async<StreamToken * 'state>

member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async {
let! res = trySync log tokenAndState events
match res with
| SyncResult.Conflict resync ->
return! handleFailureResync resync
| SyncResult.Written (token', streamState') ->
tokenAndState <- token', streamState'
return true }
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
__.TryOr(log, events, resyncInPreparationForRetry)
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type IStream<'event, 'state> =
/// Obtain the state from the target stream
abstract Load: log: ILogger
-> Async<StreamToken * 'state>
/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream
/// SyncResult.Written: implies the state is now the value represented by the Result's value
/// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry
abstract TrySync: log: ILogger
-> token: StreamToken * originState: 'state
-> events: 'event list
-> Async<SyncResult<'state>>

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
: Async<'result> =
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt: Async<'result> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide syncState.State
if List.isEmpty events then
log.Debug "No events generated"
return result
elif attempt = maxSyncAttempts then
log.Debug "Max Sync Attempts exceeded"
let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false })
/// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs.
module internal Flow =

if not comitted then
return raise (createMaxAttemptsExhaustedException attempt)
else
/// Represents stream and folding state between the load and run/render phases
type SyncState<'event, 'state>
( originState : StreamToken * 'state,
trySync : ILogger -> StreamToken * 'state -> 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState

member __.Memento = tokenAndState
member __.State = snd __.Memento
member __.Version = (fst __.Memento).version

member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async {
let! res = trySync log tokenAndState events
match res with
| SyncResult.Conflict resync ->
return! handleFailureResync resync
| SyncResult.Written (token', streamState') ->
tokenAndState <- token', streamState'
return true }
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
__.TryOr(log, events, resyncInPreparationForRetry)

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
: Async<'result> =
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt: Async<'result> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide syncState.State
if List.isEmpty events then
log.Debug "No events generated"
return result
else
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
elif attempt = maxSyncAttempts then
log.Debug "Max Sync Attempts exceeded"
let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false })

if not comitted then
return raise (createMaxAttemptsExhaustedException attempt)
else
return result
else
return result }
/// Commence, processing based on the incoming state
loop 1
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return result }
/// Commence, processing based on the incoming state
loop 1

let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide }
let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide }

let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return project syncState }
let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return project syncState }
31 changes: 0 additions & 31 deletions src/Equinox/Interfaces.fs

This file was deleted.

4 changes: 2 additions & 2 deletions tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module Equinox.EventStore.Tests.EventStoreTokenTests

open Equinox
open Equinox.EventStore
open Equinox.Storage
open FsCheck.Xunit
open Swensen.Unquote.Assertions
open Xunit

let unpack (Token.Unpack token : Store.StreamToken) =
let unpack (Token.Unpack token : StreamToken) =
token.pos.streamVersion, token.pos.compactionEventNumber, token.pos.batchCapacityLimit

[<Theory
Expand Down

0 comments on commit 453672d

Please sign in to comment.