forked from jet/equinox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFlow.fs
74 lines (66 loc) · 3.62 KB
/
Flow.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/// Internal implementation of the Store agnostic load + run/render. See Stream.fs for App-facing APIs.
module internal Equinox.Flow
open Equinox.Store
open Serilog
/// Represents stream and folding state between the load and run/render phases
type SyncState<'event, 'state>
( originState : StreamToken * 'state,
trySync : ILogger -> StreamToken * 'state -> 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState
member __.Memento = tokenAndState
member __.State = snd __.Memento
member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async {
let! res = trySync log tokenAndState events
match res with
| SyncResult.Conflict resync ->
return! handleFailureResync resync
| SyncResult.Written (token', streamState') ->
tokenAndState <- token', streamState'
return true }
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
__.TryOr(log, events, resyncInPreparationForRetry)
/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
: Async<'result> =
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt: Async<'result> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide syncState.State
if List.isEmpty events then
log.Debug "No events generated"
return result
elif attempt = maxSyncAttempts then
log.Debug "Max Sync Attempts exceeded"
let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false })
if not comitted then
return raise (createMaxAttemptsExhaustedException attempt)
else
return result
else
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return result }
/// Commence, processing based on the incoming state
loop 1
let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide }
let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return project syncState }