Skip to content

Commit

Permalink
refactor: Track updates, CategoryName (#134)
Browse files Browse the repository at this point in the history
- CosmosStoreSource monitoring
- Handle various renames
  • Loading branch information
bartelink committed Nov 25, 2023
1 parent f510b27 commit b4474d6
Show file tree
Hide file tree
Showing 94 changed files with 397 additions and 407 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Changed

- Target `Equinox` v `4.0.0-rc.13`, `Propulsion` v `3.0.0-rc.8.10`, `FsCodec` v `3.0.0-rc.11.1` [#131](https://github.com/jet/dotnet-templates/pull/131)
- Target `Equinox` v `4.0.0-rc.14.5`, `Propulsion` v `3.0.0-rc.9.11`, `FsCodec` v `3.0.0-rc.14.1` [#131](https://github.com/jet/dotnet-templates/pull/131)

### Removed

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ let start (args : Args.Arguments) =
(yields a started application loop)
```

### `run`, `main` functions
### `run`, `main` function

The `run` function formalizes the overall pattern. It is responsible for:

Expand Down
8 changes: 4 additions & 4 deletions equinox-patterns/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

<ItemGroup>
<!-- Equinox.Core.Batching -->
<PackageReference Include="Equinox.Core" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.11.1" />
<PackageReference Include="Equinox.Core" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.14.1" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion equinox-patterns/Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Codec =
module Memory =

let create name codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial)

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20
let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/ContainerTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ open Swensen.Unquote
let [<Property>] ``events roundtrip`` (x: Events.Event) =
let ee = Events.codec.Encode((), x)
let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
let des = Events.codec.TryDecode e
let des = Events.codec.Decode e
test <@ des = ValueSome x @>
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />

<PackageReference Include="FsCheck.Xunit" Version="3.0.0-beta2" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.8.10" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.9.11" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/FinalizationProcessTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Properties(testOutput) =
test <@ res1 && set eventTypes = set expectedEvents @>
let containerEvents =
buffer.Queue(Container.Reactions.streamName containerId1)
|> Seq.chooseV (FsCodec.Deflate.EncodeUncompressed Container.Events.codec).TryDecode
|> Seq.chooseV (FsCodec.Compression.EncodeUncompressed Container.Events.codec).Decode
|> List.ofSeq
test <@ match containerEvents with
| [ Container.Events.Finalized e ] -> e.shipmentIds = requestedShipmentIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ open Swensen.Unquote
let [<Property>] ``events roundtrip`` (x: Events.Event) =
let ee = Events.codec.Encode((), x)
let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
let des = Events.codec.TryDecode e
let des = Events.codec.Decode e
test <@ des = ValueSome x @>
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/ShipmentTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ open Swensen.Unquote
let [<Property>] ``events roundtrip`` (x: Events.Event) =
let ee = Events.codec.Encode((), x)
let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
let des = Events.codec.TryDecode e
let des = Events.codec.Decode e
test <@ des = ValueSome x @>
18 changes: 8 additions & 10 deletions equinox-shipping/Domain/Container.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
module Shipping.Domain.Container

module private Stream =
let [<Literal>] Category = "Container"
let id = FsCodec.StreamId.gen ContainerId.toString
let name = id >> FsCodec.StreamName.create Category
let [<Literal>] private CategoryName = "Container"
let private streamId = FsCodec.StreamId.gen ContainerId.toString

module Reactions =
let streamName = Stream.name
let streamName = streamId >> FsCodec.StreamName.create CategoryName

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -44,8 +42,8 @@ type Service internal (resolve: ContainerId -> Equinox.Decider<Events.Event, Fol
module Factory =

let private (|Category|) = function
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
10 changes: 5 additions & 5 deletions equinox-shipping/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.11.1" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.14.1" />
</ItemGroup>

</Project>
26 changes: 12 additions & 14 deletions equinox-shipping/Domain/FinalizationTransaction.fs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
module Shipping.Domain.FinalizationTransaction

module private Stream =
let [<Literal>] Category = "FinalizationTransaction"
let id = FsCodec.StreamId.gen TransactionId.toString
let decodeId = FsCodec.StreamId.dec TransactionId.parse
let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId
let [<Literal>] private CategoryName = "FinalizationTransaction"
let private streamId = FsCodec.StreamId.gen TransactionId.toString
let private catId = CategoryId(CategoryName, streamId, FsCodec.StreamId.dec TransactionId.parse)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -36,8 +34,8 @@ module Reactions =
/// Used by the Watchdog to infer whether a given event signifies that the processing has reached a terminal state
let isTerminalEvent (encoded: FsCodec.ITimelineEvent<_>) =
encoded.EventType = nameof(Events.Completed)
let [<Literal>] Category = Stream.Category
let [<return: Struct>] (|For|_|) = Stream.tryDecode
let [<Literal>] categoryName = CategoryName
let [<return: Struct>] (|For|_|) = catId.TryDecode

module Fold =

Expand Down Expand Up @@ -84,9 +82,9 @@ module Flow =
match state, event with
| Fold.State.Initial, Events.FinalizationRequested _
| Fold.State.Reserving _, Events.RevertCommenced _
| Fold.State.Reserving _, Events.ReservationCompleted _
| Fold.State.Reserving _, Events.ReservationCompleted
| Fold.State.Reverting _, Events.Completed
| Fold.State.Assigning _, Events.AssignmentCompleted _
| Fold.State.Assigning _, Events.AssignmentCompleted
| Fold.State.Assigned _, Events.Completed -> true
| _ -> false

Expand All @@ -107,8 +105,8 @@ type Service internal (resolve: TransactionId -> Equinox.Decider<Events.Event, F
module Factory =

let private (|Category|) = function
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
47 changes: 24 additions & 23 deletions equinox-shipping/Domain/Shipment.fs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
module Shipping.Domain.Shipment

module private Stream =
let [<Literal>] Category = "Shipment"
let id = FsCodec.StreamId.gen ShipmentId.toString
let [<Literal>] private CategoryName = "Shipment"
let private streamId = FsCodec.StreamId.gen ShipmentId.toString

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -30,40 +29,42 @@ module Fold =
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let toSnapshot (state: State) = Events.Snapshotted {| reservation = state.reservation; association = state.association |}

let decideReserve transactionId: Fold.State -> bool * Events.Event[] = function
| { reservation = Some r } when r = transactionId -> true, [||]
| { reservation = None } -> true, [| Events.Reserved {| transaction = transactionId |} |]
| _ -> false, [||]
module Decisions =

let reserve transactionId: Fold.State -> bool * Events.Event[] = function
| { reservation = Some r } when r = transactionId -> true, [||]
| { reservation = None } -> true, [| Events.Reserved {| transaction = transactionId |} |]
| _ -> false, [||]

let interpretRevoke transactionId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Revoked |]
| _ -> [||] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId
let revoke transactionId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Revoked |]
| _ -> [||] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId

let interpretAssign transactionId containerId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Assigned {| container = containerId |} |]
| _ -> [||] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned
let assign transactionId containerId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Assigned {| container = containerId |} |]
| _ -> [||] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned

type Service internal (resolve: ShipmentId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.TryReserve(shipmentId, transactionId): Async<bool> =
let decider = resolve shipmentId
decider.Transact(decideReserve transactionId)
decider.Transact(Decisions.reserve transactionId)

member _.Revoke(shipmentId, transactionId): Async<unit> =
let decider = resolve shipmentId
decider.Transact(interpretRevoke transactionId)
decider.Transact(Decisions.revoke transactionId)

member _.Assign(shipmentId, containerId, transactionId): Async<unit> =
let decider = resolve shipmentId
decider.Transact(interpretAssign transactionId containerId)
decider.Transact(Decisions.assign transactionId containerId)

module Factory =

let private (|Category|) = function
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
4 changes: 2 additions & 2 deletions equinox-shipping/Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module Codec =
module Memory =

let create name codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial)

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20
let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
Expand All @@ -40,7 +40,7 @@ module Dynamo =
open Equinox.DynamoStore

let private createCached name codec initial fold accessStrategy (context, cache) =
DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, cacheStrategy cache)
DynamoStoreCategory(context, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, cacheStrategy cache)

let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) =
let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot)
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain/TransactionWatchdog.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ let fold: Events.Categorization seq -> Fold.State =

let (|TransactionStatus|) (codec: #FsCodec.IEventCodec<_, _, _>) events: Fold.State =
events
|> Seq.choose (codec.TryDecode >> function ValueSome x -> Some x | ValueNone -> None)
|> Seq.chooseV codec.Decode
|> fold

module Finalization =
Expand Down
9 changes: 9 additions & 0 deletions equinox-shipping/Domain/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@ module TransactionId =
let parse (x: string): TransactionId = %x
let (|Parse|) = parse

module Seq =

let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () }

module Guid =

let inline toStringN (x: System.Guid) = x.ToString "N"
let generateStringN () = let g = System.Guid.NewGuid() in toStringN g

/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers
type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) =
member _.StreamName = gen >> FsCodec.StreamName.create name
member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec
2 changes: 0 additions & 2 deletions equinox-shipping/Watchdog.Integration/CosmosConnector.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Shipping.Watchdog.Integration

open Shipping.Infrastructure

type CosmosConnector(connectionString, databaseId, containerId) =

let discovery = connectionString |> Equinox.CosmosStore.Discovery.ConnectionString
Expand Down
2 changes: 0 additions & 2 deletions equinox-shipping/Watchdog.Integration/DynamoConnector.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Shipping.Watchdog.Integration

open Shipping.Infrastructure

type DynamoConnector(connector: Equinox.DynamoStore.DynamoStoreConnector, table, indexTable) =

let client = connector.CreateClient()
Expand Down
1 change: 0 additions & 1 deletion equinox-shipping/Watchdog.Integration/EsdbConnector.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Shipping.Watchdog.Integration

open Shipping.Infrastructure
open System

type EsdbConnector(connection, credentials) =
Expand Down
8 changes: 2 additions & 6 deletions equinox-shipping/Watchdog.Integration/ReactorFixture.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ namespace Shipping.Watchdog.Integration

open Propulsion.Internal // IntervalTimer etc
open Shipping.Domain.Tests
open Shipping.Infrastructure
open Shipping.Watchdog
open System

Expand Down Expand Up @@ -38,10 +37,7 @@ type FixtureBase(messageSink, store, dumpStats, createSourceConfig) =
if stats.StatsInterval.RemainingMs > 3000 then
stats.StatsInterval.Trigger()
stats.StatsInterval.SleepUntilTriggerCleared()
member _.Await(propagationDelay) =
match awaitReactions with
| Some f -> f propagationDelay |> Async.ofTask
| None -> async { () }
member _.Await(propagationDelay) = awaitReactions propagationDelay |> Async.ofTask

interface IDisposable with

Expand Down Expand Up @@ -80,7 +76,7 @@ module CosmosReactor =
let store, monitored, leases = conn.Connect()
let createSourceConfig consumerGroupName =
let checkpointConfig = CosmosFeedConfig.Ephemeral consumerGroupName
SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval)
SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, TimeSpan.FromSeconds 60.)
new Fixture(messageSink, store, createSourceConfig)
member _.NullWait(_arguments) = async.Zero () // We could wire up a way to await all tranches having caught up, but not implemented yet
member val private Timeout = if System.Diagnostics.Debugger.IsAttached then TimeSpan.FromHours 1. else TimeSpan.FromMinutes 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<!-- jsii Roslyn analyzers (un-comment to obtain compile-time checks for missing required props-->
<!-- <PackageReference Include="Amazon.Jsii.Analyzers" Version="*" PrivateAssets="all" />-->

<PackageReference Include="Propulsion.DynamoStore.Constructs" Version="3.0.0-rc.8.10" />
<PackageReference Include="Propulsion.DynamoStore.Constructs" Version="3.0.0-rc.9.11" />
</ItemGroup>

</Project>
Loading

0 comments on commit b4474d6

Please sign in to comment.