Skip to content

Commit

Permalink
Process Manager wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 21, 2020
1 parent 66faf2b commit 33e4a9b
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 79 deletions.
50 changes: 35 additions & 15 deletions equinox-fc/Domain/Inventory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type internal IdsCache<'Id>() =

/// Maintains active Epoch Id in a thread-safe manner while ingesting items into the `series` of `epochs`
/// Prior to first add, reads `lookBack` epochs to seed the cache, in order to minimize the number of duplicated Ids we ingest
type Service internal (inventoryId, series : Series.Service, epochs : Epoch.Service, lookBack, capacity) =
type Service2 internal (inventoryId, series : Series.Service, epochs : Epoch.Service, lookBack, capacity) =

let log = Serilog.Log.ForContext<Service>()
let log = Serilog.Log.ForContext<Service2>()

// Maintains what we believe to be the currently open EpochId
// Guaranteed to be set only after `previousIds.AwaitValue()`
Expand Down Expand Up @@ -74,7 +74,7 @@ module internal Helpers =
let remainingEpochCapacity (state: Epoch.Fold.State) =
let currentLen = state.ids.Count
max 0 (maxTransactionsPerEpoch - currentLen)
Service(inventoryId, series, epochs, lookBack = lookBackLimit, capacity = remainingEpochCapacity)
Service2(inventoryId, series, epochs, lookBack = lookBackLimit, capacity = remainingEpochCapacity)

module Cosmos =

Expand All @@ -85,15 +85,35 @@ module Cosmos =

module Processor =

type Service(transactions : Transaction.Service, locations : Fc.Location.Service, inventory : Service) =

member __.Apply(inventoryId, transactionId, update) = async {
let! action = transactions.Apply(transactionId, update)
match action with
| Transaction.Adjust (loc, qty) -> locations.Execute
| Remove of LocationId * int
| Add of LocationId * int
| Log of TerminalState
| Finish
service.Ingest
}
type Service(transactions : Transaction.Service, locations : Fc.Location.Service, inventory : Service2) =

let execute transactionId =
let f = Fc.Location.Epoch.decide transactionId
let rec aux update = async {
let! action = transactions.Apply(transactionId, update)
match action with
| Transaction.Adjust (loc, bal) ->
match! locations.Execute(loc, f (Fc.Location.Epoch.Reset bal)) with
| Fc.Location.Epoch.Accepted _ -> do! aux (Transaction.Events.Adjusted)
| Fc.Location.Epoch.DupFromPreviousEpoch -> failwith "TODO walk back to previous epoch"
| Transaction.Remove (loc, delta) ->
match! locations.Execute(loc, f (Fc.Location.Epoch.Remove delta)) with
| Fc.Location.Epoch.Accepted bal -> do! aux (Transaction.Events.Removed { balance = bal })
| Fc.Location.Epoch.DupFromPreviousEpoch -> failwith "TODO walk back to previous epoch"
| Transaction.Add (loc, delta) ->
match! locations.Execute(loc, f (Fc.Location.Epoch.Add delta)) with
| Fc.Location.Epoch.Accepted bal -> do! aux (Transaction.Events.Added { balance = bal })
| Fc.Location.Epoch.DupFromPreviousEpoch -> failwith "TODO walk back to previous epoch"
| Transaction.Log (Transaction.Adjusted e) ->
let! _count = inventory.Ingest([Fc.Inventory.Epoch.Events.Adjusted { transactionId = transactionId }])
do! aux Transaction.Events.Logged
| Transaction.Log (Transaction.Transferred e) ->
let! _count = inventory.Ingest([Fc.Inventory.Epoch.Events.Transferred { transactionId = transactionId }])
do! aux Transaction.Events.Logged
| Transaction.Finish ->
()
}
aux

member __.Apply(transactionId, update) =
execute transactionId update
7 changes: 3 additions & 4 deletions equinox-fc/Domain/InventoryEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ module Events =
let [<Literal>] CategoryId = "InventoryEpoch"
let (|For|) (inventoryId, epochId) = FsCodec.StreamName.compose CategoryId [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]

type TransactionInfo = { transactionId : InventoryTransactionId }

type TransactionRef = { transactionId : InventoryTransactionId }
type Snapshotted = { closed: bool; ids : InventoryTransactionId[] }

type Event =
| Adjusted of TransactionInfo
| Transferred of TransactionInfo
| Adjusted of TransactionRef
| Transferred of TransactionRef
| Closed
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
Expand Down
27 changes: 16 additions & 11 deletions equinox-fc/Domain/InventoryTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,34 @@ module Events =
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

type TerminalState =
| Adjusted of Events.AdjustmentRequested
| Transferred of Added
| TransferFailed of Events.TransferRequested
and Added = { request : Events.TransferRequested; removed : Events.Removed; added : Events.Added }
type Action =
| Adjust of LocationId * int
| Remove of LocationId * int
| Add of LocationId * int
| Log of TerminalState
| Log of LoggingState
| Finish
and LoggingState =
| Adjusted of Events.AdjustmentRequested
| Transferred of Added
and Added = { request : Events.TransferRequested; removed : Events.Removed; added : Events.Added }

module Fold =

type State =
| Initial
| Running of RunningState
| Logging of TerminalState
| Logging of LoggingState
| Completed of TerminalState
and RunningState =
| Adjust of Events.AdjustmentRequested
| Transfer of TransferState
and TransferState =
| Requested of Events.TransferRequested
| Adding of Removed
and TerminalState =
| Adjusted of Events.AdjustmentRequested
| Transferred of Added
| TransferFailed of Events.TransferRequested
and Removed = { request : Events.TransferRequested; removed : Events.Removed }
let initial = Initial
let evolve state event =
Expand All @@ -71,7 +74,7 @@ module Fold =
| Initial, Events.AdjustmentRequested r ->
Running (Adjust r)
| Running (Adjust r), Events.Adjusted ->
Logging (Adjusted r)
Logging (LoggingState.Adjusted r)

(* Transfer Process *)
| Initial, Events.TransferRequested e ->
Expand All @@ -83,11 +86,13 @@ module Fold =
| Running (Transfer (Requested s)), Events.Removed e ->
Running (Transfer (Adding { request = s; removed = e }))
| Running (Transfer (Adding s)), Events.Added e ->
Logging (Transferred { request = s.request; removed = s.removed; added = e })
Logging (LoggingState.Transferred { request = s.request; removed = s.removed; added = e })

(* Log result *)
| Logging s, Events.Logged ->
Completed s
| Logging (LoggingState.Adjusted s), Events.Logged ->
Completed (Adjusted s)
| Logging (LoggingState.Transferred s), Events.Logged ->
Completed (Transferred s)

(* Any disallowed state changes represent gaps in the model, so we fail fast *)
| state, event -> failwithf "Unexpected %A when %A" event state
Expand Down
28 changes: 15 additions & 13 deletions equinox-fc/Domain/Location.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@ namespace Fc.Location

[<NoComparison; NoEquality>]
type Wip<'R> =
| Pending of decide : (Epoch.Fold.Balance -> 'R * Epoch.Events.Event list)
| Pending of decide : (Epoch.Fold.State -> 'R * Epoch.Events.Event list)
| Complete of 'R

/// Manages Reads and Writes for a Series of Epochs, with a running total being carried forward to the next Epoch when it's marked Closed
type Service internal (zeroBalance, shouldClose, series : Series.Service, epochs : Epoch.Service) =
type Service internal (zeroBalance, toBalanceCarriedForward, shouldClose, series : Series.Service, epochs : Epoch.Service) =

let rec execute locationId originEpochId =
let execute locationId originEpochId =
let rec aux epochId balanceToCarryForward wip = async {
let decide state = match wip with Complete r -> r, [] | Pending decide -> decide state
match! epochs.Sync(locationId, epochId, balanceToCarryForward, decide, shouldClose) with
| { balance = bal; result = Some res; isOpen = true } ->
| { result = Some res; isOpen = true } ->
if originEpochId <> epochId then
do! series.AdvanceIngestionEpoch(locationId, epochId)
return bal, res
| { balance = bal; result = Some res } ->
return res
| { history = history; result = Some res } ->
let successorEpochId = LocationEpochId.next epochId
return! aux successorEpochId (Some bal) (Complete res)
| { balance = bal } ->
let cf = toBalanceCarriedForward history
return! aux successorEpochId (Some cf) (Complete res)
| { history = history } ->
let successorEpochId = LocationEpochId.next epochId
return! aux successorEpochId (Some bal) wip }
let cf = toBalanceCarriedForward history
return! aux successorEpochId (Some cf) wip }
aux

member __.Execute(locationId, decide) = async {
Expand All @@ -35,12 +37,12 @@ type Service internal (zeroBalance, shouldClose, series : Series.Service, epochs
[<AutoOpen>]
module Helpers =

let create (zeroBalance, shouldClose) (series, epochs) =
Service(zeroBalance, shouldClose, series, epochs)
let create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) =
Service(zeroBalance, toBalanceCarriedForward, shouldClose, series, epochs)

module Cosmos =

let createService (zeroBalance, shouldClose) (context, cache, maxAttempts) =
let createService (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.createService (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts)
create (zeroBalance, shouldClose) (series, epochs)
create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs)
118 changes: 84 additions & 34 deletions equinox-fc/Domain/LocationEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,71 +10,121 @@ module Events =
let [<Literal>] CategoryId = "LocationEpoch"
let (|For|) (locationId, epochId) = FsCodec.StreamName.compose CategoryId [LocationId.toString locationId; LocationEpochId.toString epochId]

type CarriedForward = { initial : int }
type Delta = { delta : int; transaction : InventoryTransactionId }
type Value = { value : int; transaction : InventoryTransactionId }
type CarriedForward = { initial : int; recentTransactions : InventoryTransactionId[] }
type Event =
| CarriedForward of CarriedForward
| Added of {| delta : int; id : InventoryTransactionId |}
| Removed of {| delta : int; id : InventoryTransactionId |}
| Reset of {| value : int; id : InventoryTransactionId |}
| Closed
| Added of Delta
| Removed of Delta
| Reset of Value
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Fold =

type Balance = int
type OpenState = { count : int; value : Balance }
type State = Initial | Open of OpenState | Closed of Balance
type State =
| Initial
| Open of Record list // reverse order, i.e. most revent first
| Closed of Record list // trimmed
and Record =
| Init of Events.CarriedForward
| Step of Step
and Step = { balance : Balance; id : InventoryTransactionId }
and Balance = int
let initial = Initial
let (|Current|) = function
| (Init { initial = bal } | Step { balance = bal }) :: _ -> bal
| [] -> failwith "Cannot transact when no CarriedForward"
let evolve state event =
match event, state with
| Events.CarriedForward e, Initial -> Open { count = 0; value = e.initial }
| Events.Added e, Open bal -> Open { count = bal.count + 1; value = bal.value + e.delta }
| Events.Removed e, Open bal -> Open { count = bal.count + 1; value = bal.value - e.delta }
| Events.Reset e, Open bal -> Open { count = bal.count + 1; value = e.value }
| Events.Closed, Open { value = bal } -> Closed bal
| Events.CarriedForward _, (Open _|Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x
| (Events.Added _|Events.Removed _|Events.Reset _|Events.Closed) as e, (Initial|Closed _ as s) -> failwithf "Unexpected %A when %A" e s
| Events.CarriedForward e, Initial -> Open [Init e]
| Events.Added e, Open (Current cur as log) -> Open (Step { id = e.id ; balance = cur + e.delta } :: log)
| Events.Removed e, Open (Current cur as log) -> Open (Step { id = e.id ; balance = cur - e.delta } :: log)
| Events.Reset e, Open log -> Open (Step { id = e.id ; balance = e.value } :: log)
| Events.Closed, Open log -> Closed log
| Events.CarriedForward _, (Open _ | Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x
| (Events.Added _ | Events.Removed _ | Events.Reset _ | Events.Closed) as e, (Initial | Closed _ as s) ->
failwithf "Unexpected %A when %A" e s
let fold = Seq.fold evolve

/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events
type private Accumulator() =
let acc = ResizeArray()
member __.Ingest state : 'res * Events.Event list -> 'res * Fold.State = function
| res, [] -> res, state
| res, [e] -> acc.Add e; res, Fold.evolve state e
| res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs)
| res, [] -> res, state
| res, [e] -> acc.Add e; res, Fold.evolve state e
| res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs)
member __.Accumulated = List.ofSeq acc

type Result<'t> = { balance : Fold.Balance; result : 't option; isOpen : bool }
type Result<'t> = { history : Fold.Record list; result : 't option; isOpen : bool }

let sync (balanceCarriedForward : Fold.Balance option) (decide : Fold.Balance -> 't*Events.Event list) shouldClose state : Result<'t>*Events.Event list =
let sync (carriedForward : Events.CarriedForward option)
(decide : Fold.State -> 't * Events.Event list)
shouldClose
state
: Result<'t> * Events.Event list =

let acc = Accumulator()
// We require a CarriedForward event at the start of any Epoch's event stream
// 1. Guarantee a CarriedForward event at the start of any Epoch's event stream
let (), state =
acc.Ingest state <|
match state with
| Fold.Initial -> (), [Events.CarriedForward { initial = Option.get balanceCarriedForward }]
| Fold.Initial -> (), [Events.CarriedForward (Option.get carriedForward )]
| Fold.Open _ | Fold.Closed _ -> (), []
// Run, unless we determine we're in Closed state
// 2. Transact (unless we determine we're in Closed state)
let result, state =
acc.Ingest state <|
match state with
| Fold.Initial -> failwith "We've just guaranteed not Initial"
| Fold.Open { value = bal } -> let r, es = decide bal in Some r, es
| Fold.Closed _ -> None, []
// Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event
let (balance, isOpen), _ =
| Fold.Initial -> failwith "We've just guaranteed not Initial"
| Fold.Open history -> let r, es = decide state in Some r, es
| Fold.Closed _ -> None, []
// 3. Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event
let (history, isOpen), _ =
acc.Ingest state <|
match state with
| Fold.Initial -> failwith "Can't be Initial"
| Fold.Open ({ value = bal } as openState) when shouldClose openState -> (bal, false), [Events.Closed]
| Fold.Open { value = bal } -> (bal, true), []
| Fold.Closed bal -> (bal, false), []
{ balance = balance; result = result; isOpen = isOpen }, acc.Accumulated
| Fold.Initial -> failwith "Can't be Initial"
| Fold.Open history ->
if shouldClose history then (history, false), [Events.Closed]
else (history, true), []
| Fold.Closed history -> (history, false), []
{ history = history; result = result; isOpen = isOpen }, acc.Accumulated

type DupCheckResult = NotDuplicate | IdempotentInsert of Fold.Balance | DupCarriedForward
let private tryFindDup transactionId (history : Fold.Record list) =
let tryMatch : Fold.Record -> Fold.Balance option option = function
| Fold.Step { balance = bal; id = id } when id = transactionId -> Some (Some bal)
| Fold.Init { recentTransactions = prevs } when prevs |> Array.contains transactionId -> Some None
| _ -> None
match history |> Seq.tryPick tryMatch with
| None -> NotDuplicate
| Some None -> DupCarriedForward
| Some (Some bal) -> IdempotentInsert bal

type Command =
| Reset of value : int
| Add of delta : int
| Remove of delta : int

type Result = Accepted of Fold.Balance | DupFromPreviousEpoch

let decide transactionId command (state: Fold.State) =
match state with
| Fold.Closed _ | Fold.Initial -> failwithf "Cannot apply in state %A" state
| Fold.Open history ->

match tryFindDup transactionId history with
| IdempotentInsert bal -> Accepted bal, []
| DupCarriedForward -> DupFromPreviousEpoch, []
| NotDuplicate ->

let e =
match command with
| Reset value -> Events.Reset {| value = value; id = transactionId |}
| Add delta -> Events.Added {| delta = delta; id = transactionId |}
| Remove delta -> Events.Removed {| delta = delta; id = transactionId |}
match Fold.evolve state e with
| Fold.Open (Fold.Current cur) -> Accepted cur, []
| s -> failwithf "Unexpected state %A" s

type Service internal (log, resolve, maxAttempts) =

Expand Down
4 changes: 2 additions & 2 deletions global.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"sdk": {
"version": "2.1.500"
"version": "3.1.101"
}
}
}

0 comments on commit 33e4a9b

Please sign in to comment.