diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b8f80baf..62e29ca31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,17 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- `proIndexer`: Template demonstrating a reactor app that maintais a summarised form of an Aggregate's as `RollingState` in a separated `Views` Container [#132](https://github.com/jet/dotnet-templates/pull/132) + ### Changed -- Target `Equinox` v `4.0.0-rc.13`, `Propulsion` v `3.0.0-rc.8.10`, `FsCodec` v `3.0.0-rc.11.1` +- Target `Equinox` v `4.0.0-rc.13`, `Propulsion` v `3.0.0-rc.8.10`, `FsCodec` v `3.0.0-rc.11.1` [#131](https://github.com/jet/dotnet-templates/pull/131) ### Removed + +- `proCosmosReactor`: Extended/split to become `proIndexer` [#132](https://github.com/jet/dotnet-templates/pull/132) + ### Fixed diff --git a/README.md b/README.md index a867bc79b..104a8fd08 100644 --- a/README.md +++ b/README.md @@ -83,14 +83,18 @@ The specific behaviors carried out in reaction to incoming events often use `Equ - It is necessary to reset the CFP checkpoint (delete the checkpoint documents, or use a new Consumer Group Name) to trigger a re-traversal if events have expired since the lsat time a traversal took place. -- [`proCosmosReactor`](propulsion-cosmos-reactor/README.md) - Stripped down derivative of `proReactor` template. :pray: [@ragiano215](https://github.com/ragiano215) +- [`proIndexer`](propulsion-cosmos-reactor/README.md) - Derivative of `proReactor` template. :pray: [@ragiano215](https://github.com/ragiano215) - - Specific to CosmosDb + - Specific to CosmosDB, though it would be easy to make it support DynamoDB - For applications where the reactions using the same Container, credentials etc as the one being Monitored by the change feed processor (simpler config wiring and less argument processing) - includes full wiring for Prometheus metrics emission from the Handler outcomes + - Demonstrates notion of an `App` project that hosts common wiring common to a set of applications without having the Domain layer reference any of it. + + - Implements `sync` and `snapshot` subcommands to enable updating snapshots and/or keeping a cloned database in sync + - [`eqxShipping`](equinox-shipping/README.md) - Example demonstrating the implementation of a [Process Manager](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ProcessManager.html) using [`Equinox`](https://github.com/jet/equinox) that manages the enlistment of a set of `Shipment` Aggregate items into a separated `Container` Aggregate as an atomic operation. :pray: [@Kimserey](https://github.com/Kimserey). @@ -189,11 +193,11 @@ To use from the command line, the outline is: dotnet new eqxShipping # ... to add a Reactor against a Cosmos container for both listening and writing - md -p ../CosmosReactor | Set-Location - dotnet new proCosmosReactor + md -p ../Indexer | Set-Location + dotnet new proIndexer # ... to add a Hotel Sample for use with MessageDb or DynamoDb - md -p ../ProGotel | Set-Location + md -p ../ProHotel | Set-Location dotnet new proHotel ## TESTING diff --git a/dotnet-templates.sln b/dotnet-templates.sln index ca77c3c96..d08d51fd8 100644 --- a/dotnet-templates.sln +++ b/dotnet-templates.sln @@ -142,13 +142,13 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "FeedConsumer", "feed-consum EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "feed-source\Domain.Tests\Domain.Tests.fsproj", "{E0F63351-FADB-4972-BDE7-D5F5731B8EE4}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "proReactorCosmos", "proReactorCosmos", "{3E4CC1AC-FFA2-457E-9103-ACB497BCB270}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "proIndexer", "proIndexer", "{3E4CC1AC-FFA2-457E-9103-ACB497BCB270}" ProjectSection(SolutionItems) = preProject - propulsion-cosmos-reactor\.template.config\template.json = propulsion-cosmos-reactor\.template.config\template.json + propulsion-indexer\README.md = propulsion-indexer\README.md + propulsion-indexer\.template.config\template.json = propulsion-indexer\.template.config\template.json + propulsion-indexer\propulsion-indexer.sln = propulsion-indexer\propulsion-indexer.sln EndProjectSection EndProject -Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Reactor", "propulsion-cosmos-reactor\Reactor.fsproj", "{3A22EA30-C741-41EA-9499-7C93E5EE29FA}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "periodicIngester", "periodicIngester", "{4738344F-839D-4B82-8417-1B2D298E76C2}" ProjectSection(SolutionItems) = preProject periodic-ingester\.template.config\template.json = periodic-ingester\.template.config\template.json @@ -181,6 +181,12 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Reactor", "propulsion-hotel EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Reactor.Integration", "propulsion-hotel\Reactor.Integration\Reactor.Integration.fsproj", "{36D15020-4ED7-49FE-B0C3-FC89C45655D2}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Indexer", "propulsion-indexer\Indexer\Indexer.fsproj", "{17DF28A5-3A3A-474F-B4E8-2AF30CE73C38}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "App", "propulsion-indexer\App\App.fsproj", "{233053E0-41B3-45CE-8EB0-93CCBCB8FF0D}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "propulsion-indexer\Domain\Domain.fsproj", "{4A9F4729-12F7-4423-8A27-012913E39A53}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -287,10 +293,6 @@ Global {E0F63351-FADB-4972-BDE7-D5F5731B8EE4}.Debug|Any CPU.Build.0 = Debug|Any CPU {E0F63351-FADB-4972-BDE7-D5F5731B8EE4}.Release|Any CPU.ActiveCfg = Release|Any CPU {E0F63351-FADB-4972-BDE7-D5F5731B8EE4}.Release|Any CPU.Build.0 = Release|Any CPU - {3A22EA30-C741-41EA-9499-7C93E5EE29FA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {3A22EA30-C741-41EA-9499-7C93E5EE29FA}.Debug|Any CPU.Build.0 = Debug|Any CPU - {3A22EA30-C741-41EA-9499-7C93E5EE29FA}.Release|Any CPU.ActiveCfg = Release|Any CPU - {3A22EA30-C741-41EA-9499-7C93E5EE29FA}.Release|Any CPU.Build.0 = Release|Any CPU {AB0C8BC0-4C30-4177-9469-C06E97C9568E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {AB0C8BC0-4C30-4177-9469-C06E97C9568E}.Debug|Any CPU.Build.0 = Debug|Any CPU {AB0C8BC0-4C30-4177-9469-C06E97C9568E}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -323,6 +325,18 @@ Global {36D15020-4ED7-49FE-B0C3-FC89C45655D2}.Debug|Any CPU.Build.0 = Debug|Any CPU {36D15020-4ED7-49FE-B0C3-FC89C45655D2}.Release|Any CPU.ActiveCfg = Release|Any CPU {36D15020-4ED7-49FE-B0C3-FC89C45655D2}.Release|Any CPU.Build.0 = Release|Any CPU + {17DF28A5-3A3A-474F-B4E8-2AF30CE73C38}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {17DF28A5-3A3A-474F-B4E8-2AF30CE73C38}.Debug|Any CPU.Build.0 = Debug|Any CPU + {17DF28A5-3A3A-474F-B4E8-2AF30CE73C38}.Release|Any CPU.ActiveCfg = Release|Any CPU + {17DF28A5-3A3A-474F-B4E8-2AF30CE73C38}.Release|Any CPU.Build.0 = Release|Any CPU + {233053E0-41B3-45CE-8EB0-93CCBCB8FF0D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {233053E0-41B3-45CE-8EB0-93CCBCB8FF0D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {233053E0-41B3-45CE-8EB0-93CCBCB8FF0D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {233053E0-41B3-45CE-8EB0-93CCBCB8FF0D}.Release|Any CPU.Build.0 = Release|Any CPU + {4A9F4729-12F7-4423-8A27-012913E39A53}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4A9F4729-12F7-4423-8A27-012913E39A53}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4A9F4729-12F7-4423-8A27-012913E39A53}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4A9F4729-12F7-4423-8A27-012913E39A53}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -351,7 +365,6 @@ Global {28EBD7E4-6130-4444-B44F-BB741172A233} = {CF54228E-4DC5-4CE4-B5BC-B9A5D438B03F} {FE141E84-1AA3-4136-BD7C-318AE7BD84A5} = {EDC5EB0D-EE6D-489A-B0B0-DA85ECDF04B2} {E0F63351-FADB-4972-BDE7-D5F5731B8EE4} = {CF54228E-4DC5-4CE4-B5BC-B9A5D438B03F} - {3A22EA30-C741-41EA-9499-7C93E5EE29FA} = {3E4CC1AC-FFA2-457E-9103-ACB497BCB270} {AB0C8BC0-4C30-4177-9469-C06E97C9568E} = {4738344F-839D-4B82-8417-1B2D298E76C2} {B3B0C203-2648-432F-B49B-F9F8E1A4B4F1} = {A3A3AA9F-E039-4D1A-BA64-A8291A239861} {2D673A49-19A2-41E9-B32E-E73FE05A8457} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} @@ -360,6 +373,9 @@ Global {DE76D4BF-619A-4553-A113-6F8D83CE63D6} = {C76DEE36-C648-4351-9BB6-F4470D72D473} {0F4E4FF4-0471-4D1E-B80D-F6AB8EA87DD5} = {C76DEE36-C648-4351-9BB6-F4470D72D473} {36D15020-4ED7-49FE-B0C3-FC89C45655D2} = {C76DEE36-C648-4351-9BB6-F4470D72D473} + {17DF28A5-3A3A-474F-B4E8-2AF30CE73C38} = {3E4CC1AC-FFA2-457E-9103-ACB497BCB270} + {233053E0-41B3-45CE-8EB0-93CCBCB8FF0D} = {3E4CC1AC-FFA2-457E-9103-ACB497BCB270} + {4A9F4729-12F7-4423-8A27-012913E39A53} = {3E4CC1AC-FFA2-457E-9103-ACB497BCB270} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {D8B64643-4049-466D-BAFC-0437B8C1E508} diff --git a/propulsion-cosmos-reactor/Contract.fs b/propulsion-cosmos-reactor/Contract.fs deleted file mode 100644 index bd412c96e..000000000 --- a/propulsion-cosmos-reactor/Contract.fs +++ /dev/null @@ -1,14 +0,0 @@ -module ReactorTemplate.Contract - -/// A single Item in the list -type ItemInfo = { id: int; order: int; title: string; completed: bool } - -type SummaryInfo = { items: ItemInfo[] } - -let render (item: Todo.Events.ItemData): ItemInfo = - { id = item.id - order = item.order - title = item.title - completed = item.completed } -let ofState (state: Todo.Fold.State): SummaryInfo = - { items = [| for x in state.items -> render x |]} diff --git a/propulsion-cosmos-reactor/Program.fs b/propulsion-cosmos-reactor/Program.fs deleted file mode 100644 index fbf735a41..000000000 --- a/propulsion-cosmos-reactor/Program.fs +++ /dev/null @@ -1,148 +0,0 @@ -module ReactorTemplate.Program - -open Serilog -open System - -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - -type Configuration(tryGet) = - - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" - - member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" - member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" - member _.CosmosContainer = get "EQUINOX_COSMOS_CONTAINER" - -module Args = - - open Argu - [] - type Parameters = - | [] Verbose - | [] PrometheusPort of int - | [] ProcessorName of string - | [] MaxReadAhead of int - | [] MaxWriters of int - | [] Cosmos of ParseResults - interface IArgParserTemplate with - member p.Usage = p |> function - | Verbose -> "request Verbose Logging. Default: off." - | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off." - | ProcessorName _ -> "Projector consumer group name." - | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 2." - | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8." - | Cosmos _ -> "specify CosmosDB input parameters" - and Arguments(c: Configuration, p: ParseResults) = - let maxReadAhead = p.GetResult(MaxReadAhead, 2) - let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) - member val Verbose = p.Contains Parameters.Verbose - member val PrometheusPort = p.TryGetResult PrometheusPort - member val ProcessorName = p.GetResult ProcessorName - member x.ProcessorParams() = Log.Information("Reacting... {processorName}, reading {maxReadAhead} ahead, {dop} writers", - x.ProcessorName, maxReadAhead, maxConcurrentProcessors) - (x.ProcessorName, maxReadAhead, maxConcurrentProcessors) - member val StatsInterval = TimeSpan.FromMinutes 1. - member val StateInterval = TimeSpan.FromMinutes 5. - member val Cosmos = CosmosArguments(c, p.GetResult Cosmos) - and [] CosmosParameters = - | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode - | [] Connection of string - | [] Database of string - | [] Container of string - | [] Timeout of float - | [] Retries of int - | [] RetriesWaitTime of float - - | [] Verbose - | [] LeaseContainer of string - | [] FromTail - | [] MaxItems of int - | [] LagFreqM of float - interface IArgParserTemplate with - member p.Usage = p |> function - | ConnectionMode _ -> "override the connection mode. Default: Direct." - | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" - | Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" - | Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" - | Timeout _ -> "specify operation timeout in seconds. Default: 5." - | Retries _ -> "specify operation retries. Default: 1." - | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." - - | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" - | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." - | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." - | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" - and CosmosArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult CosmosParameters.Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString - let mode = p.TryGetResult ConnectionMode - let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds - let retries = p.GetResult(Retries, 1) - let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds - let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) - - let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") - let fromTail = p.Contains FromTail - let maxItems = p.TryGetResult MaxItems - let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes - member val Verbose = p.Contains Verbose - member val MonitoringParams = fromTail, maxItems, lagFrequency - member _.ConnectWithFeed() = connector.ConnectWithFeed(database, containerId, leaseContainerId) - - /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args - let parse tryGetConfigValue argv: Arguments = - let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name - let parser = ArgumentParser.Create(programName=programName) - Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv) - -let [] AppName = "ReactorTemplate" - -let build (args: Args.Arguments) = - let processorName, maxReadAhead, maxConcurrentStreams = args.ProcessorParams() - let context, monitored, leases = args.Cosmos.ConnectWithFeed() |> Async.RunSynchronously - let sink = - let store = - let cache = Equinox.Cache(AppName, sizeMb = 10) - Store.Config.Cosmos (context, cache) - let stats = Reactor.Stats(Log.Logger, args.StatsInterval, args.StateInterval) - let handle = Reactor.Factory.createHandler store - Reactor.Factory.StartSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) - let source = - let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumCategoryEvents Reactor.reactionCategories - let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect parseFeedDoc) - let startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams - Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) - sink, source - -// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly -let startMetricsServer port: IDisposable = - let metricsServer = new Prometheus.KestrelMetricServer(port = port) - let ms = metricsServer.Start() - Log.Information("Prometheus /metrics endpoint on port {port}", port) - { new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() } - -open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException - -let run args = async { - let sink, source = build args - use _metricsServer: IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj - return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException() - source.AwaitWithStopOnCancellation() - sink.AwaitWithStopOnCancellation() - |] |> Async.Parallel |> Async.Ignore } - -[] -let main argv = - try let args = Args.parse EnvVar.tryGet argv - try let metrics = Sinks.equinoxAndPropulsionCosmosConsumerMetrics (Sinks.tags AppName) args.ProcessorName - Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() - try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 - finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintfn "Exception %s" e.Message; 1 diff --git a/propulsion-cosmos-reactor/Store.fs b/propulsion-cosmos-reactor/Store.fs deleted file mode 100644 index 89ad8fbde..000000000 --- a/propulsion-cosmos-reactor/Store.fs +++ /dev/null @@ -1,32 +0,0 @@ -module ReactorTemplate.Store - -module Metrics = - - let log = Serilog.Log.ForContext("isMetric", true) - -let createDecider cat = Equinox.Decider.forStream Metrics.log cat - -module Codec = - - let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = - FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default - -let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 -let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) - -module Cosmos = - - let private createCached name codec initial fold accessStrategy (context, cache) = - Equinox.CosmosStore.CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache) - - let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = - let accessStrategy = Equinox.CosmosStore.AccessStrategy.Snapshot (isOrigin, toSnapshot) - createCached name codec initial fold accessStrategy (context, cache) - - let createRollingState name codec initial fold toSnapshot (context, cache) = - let accessStrategy = Equinox.CosmosStore.AccessStrategy.RollingState toSnapshot - createCached name codec initial fold accessStrategy (context, cache) - -[] -type Config = - | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Cache diff --git a/propulsion-cosmos-reactor/.template.config/template.json b/propulsion-indexer/.template.config/template.json similarity index 55% rename from propulsion-cosmos-reactor/.template.config/template.json rename to propulsion-indexer/.template.config/template.json index e821f84a2..247239b89 100644 --- a/propulsion-cosmos-reactor/.template.config/template.json +++ b/propulsion-indexer/.template.config/template.json @@ -1,6 +1,6 @@ { "$schema": "http://json.schemastore.org/template", - "author": "@jet @ragiano215 @bartelink", + "author": "@jet @rockwell-automation-inc @ragiano215 @bartelink", "classifications": [ "CosmosDb", "ChangeFeed", @@ -12,11 +12,11 @@ ], "tags": { "language": "F#", - "type": "project" + "type": "solution" }, - "identity": "Propulsion.Template.CosmosReactor", - "name": "Propulsion Cosmos Reactor", - "shortName": "proCosmosReactor", - "sourceName": "ReactorTemplate", + "identity": "Propulsion.Indexer", + "name": "Propulsion Indexer", + "shortName": "proIndexer", + "sourceName": "IndexerTemplate", "preferNameDirectory": true } diff --git a/propulsion-cosmos-reactor/Reactor.fsproj b/propulsion-indexer/App/App.fsproj similarity index 54% rename from propulsion-cosmos-reactor/Reactor.fsproj rename to propulsion-indexer/App/App.fsproj index e39c407dd..3ec78ee35 100644 --- a/propulsion-cosmos-reactor/Reactor.fsproj +++ b/propulsion-indexer/App/App.fsproj @@ -1,30 +1,22 @@ - + - Exe net6.0 5 + true - - - - - - - - + - - + diff --git a/propulsion-indexer/App/Configuration.fs b/propulsion-indexer/App/Configuration.fs new file mode 100644 index 000000000..57c072722 --- /dev/null +++ b/propulsion-indexer/App/Configuration.fs @@ -0,0 +1,19 @@ +module App.Args + +exception MissingArg of message: string with override this.Message = this.message +let missingArg msg = raise (MissingArg msg) + +let [] CONNECTION = "EQUINOX_COSMOS_CONNECTION" +let [] DATABASE = "EQUINOX_COSMOS_DATABASE" +let [] CONTAINER = "EQUINOX_COSMOS_CONTAINER" +let [] VIEWS = "EQUINOX_COSMOS_VIEWS" + +type Configuration(tryGet: string -> string option) = + + let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + + member _.CosmosConnection = get CONNECTION + member _.CosmosDatabase = get DATABASE + member _.CosmosContainer = get CONTAINER + member _.CosmosViews = get VIEWS + diff --git a/propulsion-cosmos-reactor/Infrastructure.fs b/propulsion-indexer/App/Infrastructure.fs similarity index 57% rename from propulsion-cosmos-reactor/Infrastructure.fs rename to propulsion-indexer/App/Infrastructure.fs index b609753b1..d27526389 100644 --- a/propulsion-cosmos-reactor/Infrastructure.fs +++ b/propulsion-indexer/App/Infrastructure.fs @@ -1,51 +1,18 @@ -[] -module ReactorTemplate.Infrastructure +[] +module Infrastructure -open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings open Serilog open System -module Guid = - - let inline toStringN (x: Guid) = x.ToString "N" - -/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant -type ClientId = Guid -and [] clientId -module ClientId = - let toString (value: ClientId): string = Guid.toStringN %value - let parse (value: string): ClientId = let raw = Guid.Parse value in % raw - let (|Parse|) = parse - module EnvVar = let tryGet varName: string option = Environment.GetEnvironmentVariable varName |> Option.ofObj -module Streams = - - let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span) - // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) - let private tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = - match codec.TryDecode event with - | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> - Log.ForContext("eventData", renderBody event.Data) - .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) - ValueNone - | x -> x - let (|Decode|) codec struct (stream, events: Propulsion.Sinks.Event[]): 'E[] = - events |> Propulsion.Internal.Array.chooseV (tryDecode codec stream) - - module Codec = - - let gen<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Propulsion.Sinks.Codec<'E> = - FsCodec.SystemTextJson.Codec.Create<'E>() // options = Options.Default - module Log = /// Allow logging to filter out emission of log messages whose information is also surfaced as metrics let isStoreMetrics e = Filters.Matching.WithProperty("isMetric").Invoke e - /// Equinox and Propulsion provide metrics as properties in log emissions /// These helpers wire those to pass through virtual Log Sinks that expose them as Prometheus metrics. module Sinks = @@ -109,9 +76,9 @@ type Equinox.CosmosStore.CosmosStoreContext with type Equinox.CosmosStore.CosmosStoreClient with - member x.CreateContext(role: string, databaseId, containerId, tipMaxEvents) = - let c = Equinox.CosmosStore.CosmosStoreContext(x, databaseId, containerId, tipMaxEvents) - c.LogConfiguration(role, databaseId, containerId) + member x.CreateContext(role: string, databaseId, containerId, tipMaxEvents, ?queryMaxItems, ?tipMaxJsonLength, ?skipLog) = + let c = Equinox.CosmosStore.CosmosStoreContext(x, databaseId, containerId, tipMaxEvents, ?queryMaxItems = queryMaxItems, ?tipMaxJsonLength = tipMaxJsonLength) + if skipLog = Some true then () else c.LogConfiguration(role, databaseId, containerId) c type Equinox.CosmosStore.CosmosStoreConnector with @@ -124,10 +91,53 @@ type Equinox.CosmosStore.CosmosStoreConnector with member private x.CreateAndInitialize(role, databaseId, containers) = x.LogConfiguration(role, databaseId, containers) x.CreateAndInitialize(databaseId, containers) - member private x.Connect(role, databaseId, containerId: string, ?auxContainerId) = async { - let! cosmosClient = x.CreateAndInitialize(role, databaseId, [| containerId; yield! Option.toList auxContainerId |]) - return cosmosClient, Equinox.CosmosStore.CosmosStoreClient(cosmosClient).CreateContext(role, databaseId, containerId, tipMaxEvents = 256) } - member x.ConnectWithFeed(databaseId, containerId, auxContainerId) = async { - let! cosmosClient, context = x.Connect("Main", databaseId, containerId, auxContainerId) + member private x.Connect(role, databaseId, containers) = + x.LogConfiguration(role, databaseId, containers) + x.Connect(databaseId, containers) + member private x.Connect(role, databaseId, containerId, viewsContainerId, ?auxContainerId, ?logSnapshotConfig) = async { + let! cosmosClient = x.CreateAndInitialize(role, databaseId, [| yield containerId; yield viewsContainerId; yield! Option.toList auxContainerId |]) + let client = Equinox.CosmosStore.CosmosStoreClient(cosmosClient) + let contexts = + client.CreateContext(role, databaseId, containerId, tipMaxEvents = 256, queryMaxItems = 500), + client.CreateContext(role, databaseId, viewsContainerId, tipMaxEvents = 256, queryMaxItems = 500), + // NOTE the tip limits for this connection are set to be effectively infinite in order to ensure that writes never trigger calving from the tip + client.CreateContext("snapshotUpdater", databaseId, containerId, tipMaxEvents = 1024, tipMaxJsonLength = 1024 * 1024, + skipLog = not (logSnapshotConfig = Some true)) + return cosmosClient, contexts } + member x.ConnectWithFeed(databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig) = async { + let! cosmosClient, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig = logSnapshotConfig) let source, leases = CosmosStoreConnector.getSourceAndLeases cosmosClient databaseId containerId auxContainerId - return context, source, leases } + return contexts, source, leases } + + /// Indexer Sync mode: When using a ReadOnly connection string, the leases need to be maintained alongside the target + member x.ConnectWithFeedReadOnly(databaseId, containerId: string, viewsContainerId, auxClient, auxDatabaseId, auxContainerId) = async { + let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId = viewsContainerId) + let source = CosmosStoreConnector.getSource client databaseId containerId + let leases = CosmosStoreConnector.getLeases auxClient auxDatabaseId auxContainerId + return contexts, source, leases } + + /// Indexer Sync mode: Connects to an External Store that we want to Sync into + member x.ConnectExternal(role, databaseId, containerId) = async { + let! client = x.Connect(role, databaseId, [| containerId |]) + return client.CreateContext(role, databaseId, containerId, tipMaxEvents = 128) } + +type Factory private () = + + static member StartSink(log, stats, maxConcurrentStreams, handle, maxReadAhead) = + Propulsion.Sinks.Factory.StartConcurrent(log, maxReadAhead, maxConcurrentStreams, handle, stats) + +module OutcomeKind = + + let [] (|StoreExceptions|_|) (exn: exn) = + match exn with + | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | _ -> ValueNone + +// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly +let startMetricsServer port: IDisposable = + let metricsServer = new Prometheus.KestrelMetricServer(port = port) + let ms = metricsServer.Start() + Log.Information("Prometheus /metrics endpoint on port {port}", port) + { new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() } diff --git a/propulsion-indexer/Domain/Domain.fsproj b/propulsion-indexer/Domain/Domain.fsproj new file mode 100644 index 000000000..b71ed65fe --- /dev/null +++ b/propulsion-indexer/Domain/Domain.fsproj @@ -0,0 +1,24 @@ + + + + net6.0 + 5 + true + + + + + + + + + + + + + + + + + + diff --git a/propulsion-indexer/Domain/Infrastructure.fs b/propulsion-indexer/Domain/Infrastructure.fs new file mode 100644 index 000000000..3e11ec691 --- /dev/null +++ b/propulsion-indexer/Domain/Infrastructure.fs @@ -0,0 +1,29 @@ +[] +module Infrastructure + +open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings +open System + +module Guid = + + let inline toStringN (x: Guid) = x.ToString "N" + +module TimeSpan = + + let seconds value = TimeSpan.FromSeconds value + +/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant +type ClientId = Guid +and [] clientId +module ClientId = + let toString (value: ClientId): string = Guid.toStringN %value + let parse (value: string): ClientId = let raw = Guid.Parse value in % raw + let (|Parse|) = parse + +type Equinox.Decider<'e, 's> with + + member x.TransactWithPostVersion(decide: 's -> 'r * 'e[]) = + x.TransactEx((fun (c: Equinox.ISyncContext<_>) -> decide c.State), + (fun res (c: Equinox.ISyncContext<_>) -> res, c.Version)) + +type DataMemberAttribute = System.Runtime.Serialization.DataMemberAttribute diff --git a/propulsion-indexer/Domain/Store.fs b/propulsion-indexer/Domain/Store.fs new file mode 100644 index 000000000..2240fa018 --- /dev/null +++ b/propulsion-indexer/Domain/Store.fs @@ -0,0 +1,90 @@ +module Store + +module Metrics = + + let log = Serilog.Log.ForContext("isMetric", true) + +let createDecider cat = Equinox.Decider.forStream Metrics.log cat + +module Codec = + + let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = + FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default + +/// Implements a Service with a single method that visits the identified stream, with the following possible outcomes: +/// 1) stream has a 'current' snapshot (per the `isCurrentSnapshot` predicate supplied to `Snapshot.create` and/or `fold'`:- +/// - no writes take place +/// - state and version enter cache (not strictly necessary; but would enable purging of state to have a reduced effect in terms of inducing redundant loads) +/// - no further invocations should be triggered until there is a fresh event +/// 2) stream state was derived by either loading and folding all events:- +/// - pretend we are writing an event so we trigger a Sync operation +/// - The `transmuteAllEventsToUnfolds` that was supplied to `AccessStrategy.Custom`: +/// - replaces this placeholder 'event' with an empty Array of events +/// - flips the snapshot event into the 'unfolds' list +/// - The Sync Stored procedure then processes the ensuing request, replacing the current (missing or outdated) `'u`nfolds with the fresh snapshot +module Snapshotter = + + type private StateWithSnapshottedFlag<'s> = bool * 's + type Service<'id, 'e, 's> internal (resolve: 'id -> Equinox.Decider<'e, StateWithSnapshottedFlag<'s>>, generate: 's -> 'e) = + + member _.TryUpdate(id): Async = + let decider = resolve id + let decide (hasSnapshot, state) = + if hasSnapshot then false, Array.empty // case 1: no update required as the stream already has a correct snapshot + else true, generate state |> Array.singleton // case 2: yield a tentative event (which transmuteAllEventsToUnfolds will flip to being an unfold) + decider.TransactWithPostVersion(decide) + + let internal createService streamId generate cat = + let resolve = streamId >> createDecider cat + Service(resolve, generate) + + let internal initial'<'s> initial: StateWithSnapshottedFlag<'s> = false, initial + let internal fold' isCurrentSnapshot fold (_wasOrigin, s) xs: StateWithSnapshottedFlag<'s> = + // NOTE ITimelineEvent.IsUnfold and/or a generic isOrigin event would be insufficient for our needs + // The tail event encountered by the fold could either be: + // - an 'out of date' snapshot (which the normal load process would be able to upconvert from, but is not what we desire) + // - another event (if there is no snapshot of any kind) + isCurrentSnapshot (Array.last xs), fold s xs + +let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 +let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) + +[] +type Config = + | Cosmos of contexts: CosmosContexts * cache: Equinox.Cache +and [] CosmosContexts = + { main: Equinox.CosmosStore.CosmosStoreContext + views: Equinox.CosmosStore.CosmosStoreContext + /// Variant of `main` that's configured such that `module Snapshotter` updates will never trigger a calve + snapshotUpdate: Equinox.CosmosStore.CosmosStoreContext } + +module Cosmos = + + open Equinox.CosmosStore + + let private createCached name codec initial fold accessStrategy (context, cache) = + CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache) + + let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = + let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) + createCached name codec initial fold accessStrategy (context.main, cache) + + let createRollingState name codec initial fold toSnapshot (context, cache) = + let accessStrategy = AccessStrategy.RollingState toSnapshot + createCached name codec initial fold accessStrategy (context.views, cache) + + let createConfig (main, views, snapshotUpdate) cache = + Config.Cosmos ({ main = main; views = views; snapshotUpdate = snapshotUpdate }, cache) + + module Snapshotter = + + let private accessStrategy isOrigin = + let transmuteAllEventsToUnfolds events _state = [||], events + AccessStrategy.Custom (isOrigin, transmuteAllEventsToUnfolds) + let private createCategory name codec initial fold isCurrent (contexts, cache) = + createCached name codec (Snapshotter.initial' initial) (Snapshotter.fold' isCurrent fold) (accessStrategy isCurrent) (contexts.snapshotUpdate, cache) + + let create codec initial fold (isCurrentSnapshot, generate) streamId categoryName config = + let cat = config |> function + | Config.Cosmos (context, cache) -> createCategory categoryName codec initial fold isCurrentSnapshot (context, cache) + Snapshotter.createService streamId generate cat diff --git a/propulsion-indexer/Domain/Streams.fs b/propulsion-indexer/Domain/Streams.fs new file mode 100644 index 000000000..87abfc2d6 --- /dev/null +++ b/propulsion-indexer/Domain/Streams.fs @@ -0,0 +1,22 @@ +module Streams + +open Serilog + +module Codec = + + let gen<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Propulsion.Sinks.Codec<'E> = + FsCodec.SystemTextJson.Codec.Create<'E>() // options = Options.Default + + let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span) + + // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) + let internal tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = + match codec.TryDecode event with + | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> + Log.ForContext("eventData", renderBody event.Data) + .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) + ValueNone + | x -> x + +let (|Decode|) codec struct (stream, events: Propulsion.Sinks.Event[]): 'E[] = + events |> Propulsion.Internal.Array.chooseV (Codec.tryDecode codec stream) diff --git a/propulsion-cosmos-reactor/Todo.fs b/propulsion-indexer/Domain/Todo.fs similarity index 75% rename from propulsion-cosmos-reactor/Todo.fs rename to propulsion-indexer/Domain/Todo.fs index 64d01474d..54598f789 100644 --- a/propulsion-cosmos-reactor/Todo.fs +++ b/propulsion-indexer/Domain/Todo.fs @@ -1,4 +1,4 @@ -module ReactorTemplate.Todo +module IndexerTemplate.Domain.Todo module private Stream = let [] Category = "Todos" @@ -18,7 +18,7 @@ module Events = | Updated of ItemData | Deleted of DeletedData | Cleared of ClearedData - | Snapshotted of SnapshotData + | [] Snapshotted of SnapshotData interface TypeShape.UnionContract.IUnionContract let codec = Store.Codec.genJsonElement @@ -42,19 +42,25 @@ module Fold = type State = { items: Events.ItemData list; nextId: int } /// State implied by the absence of any events on this stream let initial = { items = []; nextId = 0 } - /// Compute State change implied by a giveC:\Users\f0f00db\Projects\dotnet-templates\propulsion-summary-projector\Todo.fsn Event + + module Snapshot = + /// Prepares an Event that encodes all relevant aspects of a State such that `evolve` can rehydrate a complete State from it + let generate state = Events.Snapshotted { nextId = state.nextId; items = Array.ofList state.items } + /// Determines whether a given event represents a checkpoint that implies we don't need to see any preceding events + let isOrigin = function Events.Cleared _ | Events.Snapshotted _ -> true | _ -> false + let config = isOrigin, generate + let internal hydrate (e: Events.SnapshotData): State = + { nextId = e.nextId; items = List.ofArray e.items } + + /// Compute State change implied by a given Event let evolve s = function | Events.Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 } | Events.Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) } | Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> x.id <> e.id) } | Events.Cleared e -> { nextId = e.nextId; items = [] } - | Events.Snapshotted s -> { nextId = s.nextId; items = List.ofArray s.items } + | Events.Snapshotted e -> Snapshot.hydrate e /// Folds a set of events from the store into a given `state` let fold: State -> Events.Event seq -> State = Seq.fold evolve - /// Determines whether a given event represents a checkpoint that implies we don't need to see any preceding events - let isOrigin = function Events.Cleared _ | Events.Snapshotted _ -> true | _ -> false - /// Prepares an Event that encodes all relevant aspects of a State such that `evolve` can rehydrate a complete State from it - let toSnapshot state = Events.Snapshotted { nextId = state.nextId; items = Array.ofList state.items } /// Defines operations that a Controller or Projector can perform on a Todo List type Service internal (resolve: ClientId -> Equinox.Decider) = @@ -67,7 +73,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider Store.Cosmos.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) let create (Category cat) = Service(Stream.id >> Store.createDecider cat) - diff --git a/propulsion-cosmos-reactor/TodoSummary.fs b/propulsion-indexer/Domain/TodoIndex.fs similarity index 96% rename from propulsion-cosmos-reactor/TodoSummary.fs rename to propulsion-indexer/Domain/TodoIndex.fs index a20a07b22..9e909f813 100644 --- a/propulsion-cosmos-reactor/TodoSummary.fs +++ b/propulsion-indexer/Domain/TodoIndex.fs @@ -1,7 +1,7 @@ -module ReactorTemplate.TodoSummary +module IndexerTemplate.Domain.TodoIndex module private Stream = - let [] Category = "TodoSummary" + let [] Category = "$TodoIndex" let id = FsCodec.StreamId.gen ClientId.toString // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care diff --git a/propulsion-cosmos-reactor/Reactor.fs b/propulsion-indexer/Indexer/Indexer.fs similarity index 65% rename from propulsion-cosmos-reactor/Reactor.fs rename to propulsion-indexer/Indexer/Indexer.fs index d3e586ec6..d82e97819 100644 --- a/propulsion-cosmos-reactor/Reactor.fs +++ b/propulsion-indexer/Indexer/Indexer.fs @@ -1,9 +1,9 @@ -module ReactorTemplate.Reactor +module IndexerTemplate.Indexer.Indexer type Outcome = Metrics.Outcome /// Gathers stats based on the Outcome of each Span as it's processed, for periodic emission via DumpStats() -type Stats(log, statsInterval, stateInterval) = +type Stats(log, statsInterval, stateInterval, verboseStore) = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) let mutable ok, skipped, na = 0, 0, 0 @@ -18,23 +18,30 @@ type Stats(log, statsInterval, stateInterval) = if ok <> 0 || skipped <> 0 || na <> 0 then log.Information(" used {ok} skipped {skipped} n/a {na}", ok, skipped, na) ok <- 0; skipped <- 0; na <- 0 + Equinox.CosmosStore.Core.Log.InternalMetrics.dump Serilog.Log.Logger + override _.Classify(e) = + match e with + | OutcomeKind.StoreExceptions kind -> kind + | Equinox.CosmosStore.Exceptions.ServiceUnavailable when not verboseStore -> Propulsion.Streams.OutcomeKind.RateLimited + | x -> base.Classify x override _.HandleExn(log, exn) = log.Information(exn, "Unhandled") -// map from external contract to internal contract defined by the aggregate -let toSummaryEventData (x: Contract.SummaryInfo): TodoSummary.Events.SummaryData = +open IndexerTemplate.Domain + +let sourceCategories = Todo.Reactions.categories + +let toSummaryEventData (x: Todo.Fold.State): TodoIndex.Events.SummaryData = { items = [| for x in x.items -> { id = x.id; order = x.order; title = x.title; completed = x.completed } |] } -let reactionCategories = Todo.Reactions.categories - -let handle (sourceService: Todo.Service) (summaryService: TodoSummary.Service) stream events = async { +let handle (sourceService: Todo.Service) (summaryService: TodoIndex.Service) stream events = async { match struct (stream, events) with | Todo.Reactions.ImpliesStateChange clientId -> - let! version', summary = sourceService.QueryWithVersion(clientId, Contract.ofState) - match! summaryService.TryIngest(clientId, version', toSummaryEventData summary) with + let! version', summary = sourceService.QueryWithVersion(clientId, toSummaryEventData) + match! summaryService.TryIngest(clientId, version', summary) with | true -> return Propulsion.Sinks.StreamResult.OverrideNextIndex version', Outcome.Ok (1, events.Length - 1) | false -> return Propulsion.Sinks.StreamResult.OverrideNextIndex version', Outcome.Skipped events.Length | _ -> return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.NotApplicable events.Length } @@ -43,10 +50,5 @@ module Factory = let createHandler store = let srcService = Todo.Factory.create store - let dstService = TodoSummary.Factory.create store + let dstService = TodoIndex.Factory.create store handle srcService dstService - -type Factory private () = - - static member StartSink(log, stats, maxConcurrentStreams, handle, maxReadAhead) = - Propulsion.Sinks.Factory.StartConcurrent(log, maxReadAhead, maxConcurrentStreams, handle, stats) diff --git a/propulsion-indexer/Indexer/Indexer.fsproj b/propulsion-indexer/Indexer/Indexer.fsproj new file mode 100644 index 000000000..78dc99355 --- /dev/null +++ b/propulsion-indexer/Indexer/Indexer.fsproj @@ -0,0 +1,26 @@ + + + + Exe + net6.0 + 5 + true + + + + + + + + + + + + + + + + + + + diff --git a/propulsion-cosmos-reactor/ReactorMetrics.fs b/propulsion-indexer/Indexer/Metrics.fs similarity index 90% rename from propulsion-cosmos-reactor/ReactorMetrics.fs rename to propulsion-indexer/Indexer/Metrics.fs index d41ab35ad..d88f0ff3a 100644 --- a/propulsion-cosmos-reactor/ReactorMetrics.fs +++ b/propulsion-indexer/Indexer/Metrics.fs @@ -1,7 +1,7 @@ -module ReactorTemplate.Metrics +module IndexerTemplate.Indexer.Metrics -let baseName stat = "ReactorTemplate_reactor_" + stat -let baseDesc desc = "ReactorTemplate: Reactor " + desc +let baseName stat = "IndexerTemplate_reactor_" + stat +let baseDesc desc = "IndexerTemplate: Reactor " + desc module private Counter = diff --git a/propulsion-indexer/Indexer/Program.fs b/propulsion-indexer/Indexer/Program.fs new file mode 100644 index 000000000..17542de7f --- /dev/null +++ b/propulsion-indexer/Indexer/Program.fs @@ -0,0 +1,205 @@ +module IndexerTemplate.Indexer.Program + +open App +open Serilog +open System + +module Args = + + open Argu + [] + type Parameters = + | [] Verbose + | [] PrometheusPort of int + | [] ProcessorName of string + | [] MaxReadAhead of int + | [] MaxWriters of int + | [] Index of ParseResults + | [] Snapshot of ParseResults + | [] Sync of ParseResults + interface IArgParserTemplate with + member p.Usage = p |> function + | Verbose -> "request Verbose Logging. Default: off." + | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off." + | ProcessorName _ -> "Projector consumer group name." + | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 2." + | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8." + | Index _ -> "Process indexing into the Views Container for the specified Cosmos feed" + | Snapshot _ -> "Process updating of snapshots for all traversed streams in the specified Cosmos feed" + | Sync _ -> "Sync into a specified Store for the specified Cosmos feed" + and Arguments(c: Args.Configuration, p: ParseResults) = + let maxReadAhead = p.GetResult(MaxReadAhead, 2) + let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) + member val Verbose = p.Contains Parameters.Verbose + member val PrometheusPort = p.TryGetResult PrometheusPort + member val ProcessorName = p.GetResult ProcessorName + member val StatsInterval = TimeSpan.FromMinutes 1. + member val StateInterval = TimeSpan.FromMinutes 5. + member x.Cosmos = match x.Action with Action.Index c | Action.Snapshot c -> c | Action.Sync s -> s.Source + member x.ConnectWithFeed(?lsc) = match x.Action with + | Action.Index c | Action.Snapshot c -> c.ConnectWithFeed(?lsc = lsc) + | Action.Sync s -> s.ConnectWithFeed() + member val Action = match p.GetSubCommand() with + | Parameters.Index p -> CosmosArguments(c, p) |> Index + | Parameters.Snapshot p -> CosmosArguments(c, p) |> Snapshot + | Parameters.Sync p -> SyncArguments(c, p) |> Sync + | _ -> p.Raise "Must specify a subcommand" + member x.ActionLabel = match x.Action with Action.Index _ -> "Indexing" | Action.Snapshot _ -> "Snapshotting" | Action.Sync _ -> "Exporting" + member x.IsSnapshotting = match x.Action with Action.Snapshot _ -> true | _ -> false + member x.ProcessorParams() = Log.Information("{action}... {processorName}, reading {maxReadAhead} ahead, {dop} writers", + x.ActionLabel, x.ProcessorName, maxReadAhead, maxConcurrentProcessors) + (x.ProcessorName, maxReadAhead, maxConcurrentProcessors) + and [] Action = Index of CosmosArguments | Snapshot of CosmosArguments | Sync of SyncArguments + and [] SyncParameters = + | [] Connection of string + | [] Database of string + | [] Container of string + | [] LeaseContainerId of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + | [] MaxKiB of int + | [] Source of ParseResults + interface IArgParserTemplate with + member p.Usage = p |> function + | Connection _ -> "specify a connection string for the destination Cosmos account. Default: Same as Source" + | Database _ -> "specify a database name for store. Default: Same as Source" + | Container _ -> "specify a container name for store." + | LeaseContainerId _ -> "store leases in Sync target DB (default: use `-aux` adjacent to the Source Container). Enables the Source to be read via a ReadOnly connection string." + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 0." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + | MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128." + | Source _ -> "Source store from which events are to be consumed via the feed" + and SyncArguments(c: Args.Configuration, p: ParseResults) = + let source = CosmosArguments(c, p.GetResult Source) + let discovery = p.TryGetResult SyncParameters.Connection + |> Option.map Equinox.CosmosStore.Discovery.ConnectionString + |> Option.defaultWith (fun () -> source.Discovery) + let timeout = p.GetResult(SyncParameters.Timeout, 5) |> TimeSpan.FromSeconds + let retries = p.GetResult(SyncParameters.Retries, 1) + let maxRetryWaitTime = p.GetResult(SyncParameters.RetriesWaitTime, 5) |> TimeSpan.FromSeconds + let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime) + let database = p.TryGetResult SyncParameters.Database |> Option.defaultWith (fun () -> source.Database) + let container = p.GetResult SyncParameters.Container + member val MaxBytes = p.GetResult(MaxKiB, 128) * 1024 + member val Source = source + member _.ConnectWithFeed() = match p.TryGetResult LeaseContainerId with + | Some localAuxContainerId -> source.ConnectWithFeedReadOnly(connector.CreateUninitialized(), database, localAuxContainerId) + | None -> source.ConnectWithFeed() + member _.Connect() = async { let! context = connector.ConnectExternal("Destination", database, container) + return Equinox.CosmosStore.Core.EventsContext(context, Store.Metrics.log) } + and [] CosmosParameters = + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode + | [] Connection of string + | [] Database of string + | [] Container of string + | [] Views of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + + | [] Verbose + | [] LeaseContainer of string + | [] FromTail + | [] MaxItems of int + | [] LagFreqM of float + interface IArgParserTemplate with + member p.Usage = p |> function + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {Args.CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable {Args.DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable {Args.CONTAINER} specified)" + | Views _ -> $"specify a container name for views. (optional if environment variable {Args.VIEWS} specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + + | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" + | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `` + `-aux`." + | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." + | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" + and CosmosArguments(c: Args.Configuration, p: ParseResults) = + let discovery = p.TryGetResult CosmosParameters.Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let mode = p.TryGetResult ConnectionMode + let timeout = p.GetResult(Timeout, 5) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5) |> TimeSpan.FromSeconds + let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) + let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let viewsContainerId = p.TryGetResult Views |> Option.defaultWith (fun () -> c.CosmosViews) + + let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") + let fromTail = p.Contains FromTail + let maxItems = p.TryGetResult MaxItems + let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes + member val Verbose = p.Contains Verbose + member val MonitoringParams = fromTail, maxItems, lagFrequency + member _.Discovery = discovery + member _.Database = database + member _.ConnectWithFeed(?lsc) = connector.ConnectWithFeed(database, containerId, viewsContainerId, leaseContainerId, ?logSnapshotConfig = lsc) + member _.ConnectWithFeedReadOnly(auxClient, auxDatabase, auxContainerId) = + connector.ConnectWithFeedReadOnly(database, containerId, viewsContainerId, auxClient, auxDatabase, auxContainerId) + + /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args + let parse tryGetConfigValue argv: Arguments = + let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + let parser = ArgumentParser.Create(programName=programName) + Arguments(Args.Configuration tryGetConfigValue, parser.ParseCommandLine argv) + +let [] AppName = "IndexerTemplate" + +let build (args: Args.Arguments) = async { + let processorName, maxReadAhead, maxConcurrentStreams = args.ProcessorParams() + let! contexts, monitored, leases = args.ConnectWithFeed(args.IsSnapshotting) + let store = (contexts, Equinox.Cache(AppName, sizeMb = 10)) ||> Store.Cosmos.createConfig + let parseFeedDoc, sink = + let mkParseAll () = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents (fun _ -> true) + let mkSink stats handle = Factory.StartSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) + match args.Action with + | Args.Action.Index _ -> + let mkParseCats = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumCategoryEvents + let stats = Indexer.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) + let handle = Indexer.Factory.createHandler store + mkParseCats Indexer.sourceCategories, mkSink stats handle + | Args.Action.Snapshot _ -> + let stats = Snapshotter.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) + let handle = Snapshotter.Factory.createHandler store + mkParseAll (), mkSink stats handle + | Args.Action.Sync a -> + mkParseAll (), + let eventsContext = a.Connect() |> Async.RunSynchronously + let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, args.StatsInterval, args.StateInterval) + Propulsion.CosmosStore.CosmosStoreSink.Start( + Log.Logger, maxReadAhead, eventsContext, maxConcurrentStreams, stats, + purgeInterval = TimeSpan.FromHours 1, maxBytes = a.MaxBytes) + let source = + let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect parseFeedDoc) + let startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams + Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, + startFromTail = startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) + return sink, source } + +open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException + +let run args = async { + let! sink, source = build args + use _metricsServer: IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj + return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException() + source.AwaitWithStopOnCancellation() + sink.AwaitWithStopOnCancellation() + |] |> Async.Parallel |> Async.Ignore } + +[] +let main argv = + try let args = Args.parse EnvVar.tryGet argv + try let metrics = Sinks.equinoxAndPropulsionCosmosConsumerMetrics (Sinks.tags AppName) args.ProcessorName + Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() + try run args |> Async.RunSynchronously; 0 + with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + finally Log.CloseAndFlush() + with Args.MissingArg msg -> eprintfn $"%s{msg}"; 1 + | :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintfn $"Exception %s{e.Message}"; 1 diff --git a/propulsion-indexer/Indexer/README.md b/propulsion-indexer/Indexer/README.md new file mode 100644 index 000000000..c9e689c38 --- /dev/null +++ b/propulsion-indexer/Indexer/README.md @@ -0,0 +1,228 @@ +# Indexer App + +The purpose of the Indexer app is to: +- consume the Main Container's ChangeFeed (container name is configured via `EQUINOX_COSMOS_CONTAINER`) +- ensure all derived information maintained in the Views Container is kept in sync (`EQUINOX_COSMOS_VIEWS`) + +It also implements some additional functions (all of which are based on the consumption of Equinox events from the Main Container via the ChangeFeed). + +# Command: `index` + +:warning: **Writes to the Views Container; Does not write to the Main Store Container**. + +Reacts to each event written to the main store by updating all derived information held in the Views Container. + +From the base of the repo, it's invoked as follows: + + dotnet run --project Indexer -- -g defaultIndexer index + +- `defaultIndexer` is the Processor Name (equivalent to the notion of a Consumer Group in many messaging technologies) + +## Common commandline overrides + +The following overrides can be useful: + +- `-w` overrides the streams parallelism limit (default: `8`). In general, Cosmos rate limiting policy and request latency means increasing beyond that is not normally useful. A potential exception is when retraversing due to the RU consumption from writes being avoided (i.e. correct efficient idempotent processing). +- `-r` overrides the maximum read-ahead limit (default: `2`). + - In rare cases (where the I/O involved in processing events is efficient and/or the items are being traversed at a high throughput) increasing the read ahead limit can help to ensure that there's always something for the processor 'threads' to work on + - reading ahead can help throughput in some cases: multiple events from the same stream are handled as a single span of events to the maximum degree possible, including events from batches beyond the one that's currently in progress. + +There are other settings that relate to the source ChangeFeed (i.e. you specify them after the `index` subcommand name): + +- `-b`: adjusts the maximum batch size (default: `100` items). + + - Lower values induce more frequent checkpointing, and reduce the RU cost of each individual read. + - Higher values increase the RU cost of each batch read and the number of events that need to be re-processed if a partition is re-assigned, or the Index App is restarted. + +---- + +# Indexing logic fundamentals + +Over time, the number of events in the system will keep growing. But, you still want to be able to reconstitute the content and/or add a new variant at any time. + +This has the following implications: +- processing should work on a batched basis +- no permanent state should be held in the Views Container; it's literally just a structure that's built up to support the current needs of the system as a whole in terms of providing for efficient traversal of data to support the use cases and performance needs of both read and write paths. + +## Key principle: The views Container is ephemeral + +In a deployed instance, indexing typically runs as live loop working from the tail of the ChangeFeed. + +However, it's important to note that in a well written event-sourced system that it should be considered absolutely normal to, at the drop of a hat be able to regenerate the views and/or provision a slightly tweaked version of the existing format. + +It is easy, and should remain easy to validate that a given indexing function works correctly and efficiently: blow away the views container and retraverse on the desktop or in a sandbox environment. + +:bulb: NOTE this is one reason why using Kafka or some form of Service Bus to consume events indirectly is problematic; you need to re-emit all the events to the 'bus' + +## Example: Completely regenerating the Views Container + +The only real 'state' in the system as a whole is the ChangeFeed processor checkpoint (identified by the Processor Name in the `-g` commandline argument) + +If there's ever doubt about the integrity of the state in the Views Container, it should be possible to reconstitute its content at any time by: + +1. delete the Views Container +2. re-initialize it with `eqx init` +3. run `index` over all the events from the start of time + +## Example: capturing fields from an event into the View Container state that were not previously captured + +In general, Indexes should not speculatively hold information that's not being used - writing more than is necessary consumes space and increases RU consumption. + +Instead, where a new feature and/or a tweak to an existing one requires previously un-indexed information, one can simply use an Expand and Contract strategy: +1. Expand to add the extra information + - change the `Category` for the view (i.e. if it was `$TodoIndex`, and a new field is needed, change the `Category` to `$TodoIndex2`) + - add the logic to populate the relevant state + - run the `index` subcommand (with a fresh _Processor Name_) +2. Write consumer logic +3. Deploy. This can be done in two common ways: + - (if the consumption logic is not behind a feature flag) as part of the deploy, run the `index` before the system goes live + - (if the consumption logic is behind a feature flag): + - after the deploy, run an `index` operation + - when the index has caught up, enable the feature flag that uses the information it provides + +---- + +# Command: `snapshot` + +:warning: **Writes to the Main Store Container; Does not write to the Views Container**. + +This ancillary command is hosted within the Indexer App as it involves the same elements as the normal indexing behavior (reading from the ChangeFeed, using the domain fold logic to establish the state per stream, writing to the store as configured). + +The key difference compared to the `index` subcommand is that it writes to the Main Store Container (as opposed to the Views Container). + +From the base of the repo, it's invoked as follows: + + dotnet run --project Indexer -- -g defaultSnapshotter snapshot + +- `defaultSnapshotter` is the _Processor Name_ (aka Consumer Group Name). + - It should be different to the one used for the Indexing behavior (`defaultIndexer` in the example above). + - To trigger full retraversal (as opposed to only visiting streams that have been updated since the last `snapshot` traversal, you'll want to use a fresh checkpoint name per run, see [resetting checkpoints](https://github.com/jet/propulsion/blob/doc/DOCUMENTATION.md#resetting-checkpoints)) + +## Implementation notes + +Replaces the normal/default Handler with one that: +1. **loads the State of a stream** with the normal application-level `fold` logic appropriate for its category, noting whether the most recent Snapshot Event Type was present. Having completed the load, it does one of the following: + - (if the state was derived from the latest Snapshot Event Type): _no further action is necessary_ + - (if no snapshot is present) Equinox builds the state in the normal manner by `fold`ing all the events +2. **(if the state was loaded based on the snapshot)**: + - no writes take place + - the Equinox Cache will retain the version and state (and via step `4` below, become Propulsion's updated 'write position' for that stream) + - subsequent reads of events from the ChangeFeed will typically be discarded (as the `Index` will be less than the `Version` that became that stream's write position) + + :bulb: **NOTE:** if a fresh event enters the store after the stream has been visited in a given traversal, but the writer did not write a 'latest' snapshot alongside, the snapshotter would react by immediately adding the snapshot. (For avoidance of doubt, its extremely unlikely that this will work out better in terms of RU consumption and/or latency, as the RU impact of a write is a function of the original size plus the updated size) + + :bulb: **ASIDE:** If a [purge](https://github.com/jet/propulsion/blob/doc/DOCUMENTATION.md#purging) causes the write position to be jettisoned, the cache entry comes into play: the etag in the cached state will likely result in a 304 not modified read roundtrip +3. **(if the state was not derived from the Snapshot Event Type)** + - yields a `Snapshotted` event from the `decide` function (to trigger a store Sync operation rather than the normal short-circuit if there are no events to write) + - use `AccessStrategy.Custom` to `transmute` this tentative write: + - _from_: the proposed snapshot event that the `decide` function yielded in the preceding step to trigger the store roundtrip (which would ordinarily we written as part of the streams permanent events, which would be bad) + - _to_: not appending any events, but instead replacing the 0 or many current unfolds with the single desired snapshot + + :bulb: **NOTE:** the `CosmosStoreContext` used to do the processing is configured differently to how one normally would for this to work well; `tipMaxEvents` is set to a number that's effectively infinite: + + - e.g. if you set it to `20` and tried to update the snapshot of a stream with 30 events, a 'calve' would be triggered, resulting in: + + 1. writing a calf Item containing the `30` ejected events + 2. updating the Tip to not have any `e`vents + 3. updating the Tip to have the updated `Snapshotted` event as the single entry in the `u`nfolds field +4. yields a `StreamResult.OverrideNextIndex` based on the observed version of the stream. + + :bulb: Normally, the first visit to the stream is the only one. The exception is if a new write takes place before the processing reaches the end of the ChangeFeed. + +In other aspects (from the perspective of how it traverses the events, checkpoints progress, manages concurrency and/or reading ahead), this subcommand is equivalent to the `index` subcommand; the same overrides apply. + +## Versioning events + +All types used in the body of an event need to be able to be safely round-tripped to/from the Event Store. While events are written exactly once, they will live in the store forever, so need to be designed carefully. + +As a result, you want to follow the normal principles of doing changes to message contracts safely: +- the most obvious thing not to do is rename fields +- removing fields (as long as you know no reader anywhere is going to have an exception in their absence) +- adding new fields as `option`s (assuming you can do something workable in the case where the value is `None) + +That said, the most important thing in defining the contract for a normal event sourced event is the semantic intent; if the intent/meaning behind an event is changing, it's normally best to add a new case to the `Event` union to reflect that. + +:bulb: Whenever an Event is no longer relevant to the system as a whole, it's safe to remove it from the `type Event` union - Event Types that don't have cases in the union are simply ignored. + +There's a wide variety of techniques and established strategies, most of which are covered in https://github.com/jet/fscodec#upconversion + +## Versioning snapshots + +The `Snapshotted` event in the `type Event =` union is explicitly tagged with a `Name`: + +```fsharp +| [] Snapshotted of Snapshotted +``` + +As with any Event, all types used in the body (i.e. the entire tree of types used in the `type Snapshotted` needs to be able to be roundtripped safely to/from the Store). + +The same rules above re Versioning Events apply equally. + +The critical differences between 'real Events' and `Snapshotted` events are: +- Snapshotted event are stored in the `u`nfolds field of the tip, and get replaced with a fresh version every time an event is updated +- It's always theoretically safe to remove the `Snapshotted` case from the Union; no data would be lost. However it does mean that the state will need to be produced from the events, which will likely result in higher latency and RU consumption on the initial load (the Cache means you generally only pay this price once). + +The following general rules apply: + +- CONSIDER per aggregate whether a `Snapshotted` event is appropriate. It may make sense to avoid having a snapshotted stream if streams are short, and you are using Events In Tip mode + - the cache will hold the state, and reads and writes don't pay the cost of conveying a snapshot + - the additional cost of deserializing and folding the events is extremely unlikely to be relevant when considering the overall I/O costs +- ALWAYS change the `Name` _string_ from `Snapshotted` to `Snapshotted/2`, `Snapshotted/3` etc whenever a breaking change is needed - that removes any risk of a failure to parse the snapshot event body due to changes in the representation. + +Given the above arrangements, you can at any time ensure snapshots have been updated in a given environment by running the following command against the Store: + + ```bash + # establish prod env vars for EQUINOX_COSMOS_CONNECTION, EQUINOX_COSMOS_DATABASE, EQUINOX_COSMOS_CONTAINER etc + dotnet run --project Indexer -- -g deployYYMMDD snapshot + ``` +---- + +# Command: `sync` + +:warning: **Writes to the nominated 'Export' Container; Does not write to the `source` (or the Views Container)**. + +Uses the ChangeFeed to export all the events from the Main Store Container to a nominated target Equinox Container. + +This ancillary command is hosted within the Indexer App despite involving less of the infrastructure required for the `index` or `snapshot` subcommands as it is commonly used in conjunction with those other subcommands. + +In addition to being useful on a one-off basis, it can also be run to maintain a constantly in sync read replica of a given Container. + +From the base of the repo, it's invoked as follows: + + eqx init -A cosmos -c Export # once; make an empty container to Sync into + dotnet run --project Indexer -g syncExport sync -c Export source + eqx stats -A cosmos # get stats for Main store + eqx stats -A cosmos -c Export # compare stats for exported version + +- `syncExport` is a _Processor Name_ (aka Consumer Group Name) that is different to the one used for the Indexing behavior (`defaultIndexer` in the example above) +- `source` is a mandatory subcommand of the `sync` command; you can supply the typical arguments to control the ChangeFeed as one would for `index` or `snapshot` + +:bulb: **NOTE** The Sync process only writes events. Each write will replace any snapshots stored in the `u`nfolds space in the Tip with an empty list. This means your next step after 'exporting' will frequently be to point the `EQUINOX_COSMOS_CONTAINER` at the exported version and then run the `snapshot` subcommand (while the system will work without doing so, each initial load will be less efficient). + +:bulb: **ASIDE:** You could run the `sync` concurrently with the `snapshot`; the outcome would be correct, but it would typically be inefficient as they'd be duelling for the same RU capacity. Also, depending on whether or not the source events are stored compactly (using Events In Tip), the streams may be written to many times in the course of the export (and each of those appends would mark that stream 'dirty' and hence trigger another visit by the `snapshot` processing) + +## Example: exporting to a local database from a production environment + +```bash +EQUINOX_COSMOS_CONNECTION= +EQUINOX_COSMOS_DATABASE=testdb +EQUINOX_COSMOS_CONTAINER=app # note we override this in most cases below with `-c test` +EQUINOX_COSMOS_VIEWS=app-views +SOURCE_COSMOS_KEY= +propulsion init -A cosmos -c app # creates app-aux +eqx init -A cosmos -c app-views +eqx init -A cosmos -c test +# copy from prod datastore into a temporary (`test`) +dotnet run --project Indexer -- -w 64 -g defaultSync ` + sync -s $EQUINOX_COSMOS_CONNECTION -c test -a app-aux ` + source -s $SOURCE_COSMOS_KEY -d productiondb -c app -b 1000 +# apply snapshots to the exported database (optional; things will still work without them) +dotnet run --project Indexer -- -g defaultSnapshotter ` + snapshot -c test -a app-aux # note need to specify app-aux, or it would look for test-aux +# index the data into the app-views store based on the test export +dotnet run --project Indexer -- -g defaultIndexer ` + index -c test -a app-aux -b 9999 +``` + +- `-w 64`: override normal concurrency of 8 +- `-b 9999`: reduce ChangeFeed Reader messages diff --git a/propulsion-indexer/Indexer/Snapshotter.fs b/propulsion-indexer/Indexer/Snapshotter.fs new file mode 100644 index 000000000..feb1b6563 --- /dev/null +++ b/propulsion-indexer/Indexer/Snapshotter.fs @@ -0,0 +1,46 @@ +module IndexerTemplate.Indexer.Snapshotter + +type Outcome = bool + +type Stats(log, statsInterval, stateInterval, verboseStore) = + inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) + + let mutable handled, skipped = 0, 0 + override _.HandleOk(updated) = if updated then handled <- handled + 1 else skipped <- skipped + 1 + override _.DumpStats() = + base.DumpStats() + log.Information(" Snapshotted {handled}, skipped {skipped}", handled, skipped) + handled <- 0; skipped <- 0 + Equinox.CosmosStore.Core.Log.InternalMetrics.dump Serilog.Log.Logger + + override _.Classify(e) = + match e with + | OutcomeKind.StoreExceptions kind -> kind + | Equinox.CosmosStore.Exceptions.ServiceUnavailable when not verboseStore -> Propulsion.Streams.OutcomeKind.RateLimited + | x -> base.Classify x + override _.HandleExn(log, exn) = + log.Information(exn, "Unhandled") + +open IndexerTemplate.Domain + +let handle + tryUpdateTodo + stream _events: Async<_ * Outcome> = async { + let! res, pos' = + match stream with + | Todo.Reactions.For id -> tryUpdateTodo id + | sn -> failwith $"Unexpected category %A{sn}" + // a) if the tryUpdate saw a version beyond what (Propulsion.Sinks.Events.nextIndex events) would suggest, then we pass that information out + // in order to have the scheduler drop all events until we meet an event that signifies we may need to re-update + // b) the fact that we use the same Microsoft.Azure.Cosmos.CosmosClient for the Change Feed and the Equinox-based Services means we are guaranteed + // to always see all the _events we've been supplied. (Even if this were not the case, the scheduler would retain the excess events, and that + // would result in an immediate re-triggering of the handler with those events) + return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', res } + +module Factory = + + let createHandler context = + + let todo = Todo.Factory.createSnapshotter context + handle + todo.TryUpdate diff --git a/propulsion-cosmos-reactor/README.md b/propulsion-indexer/README.md similarity index 76% rename from propulsion-cosmos-reactor/README.md rename to propulsion-indexer/README.md index a56544307..8af3db2d6 100644 --- a/propulsion-cosmos-reactor/README.md +++ b/propulsion-indexer/README.md @@ -1,9 +1,9 @@ -# Propulsion CosmosDb ChangeFeedProcessor Reactor +# Equinox/Propulsion Indexer application This project was generated using: dotnet new -i Equinox.Templates # just once, to install/update in the local templates store - dotnet new proCosmosReactor # use --help to see options + dotnet new proIndexer # use --help to see options ## Usage instructions @@ -11,7 +11,8 @@ This project was generated using: $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" # or use -s $env:EQUINOX_COSMOS_DATABASE="equinox-test" # or use -d - $env:EQUINOX_COSMOS_CONTAINER="equinox-test" # or use -c + $env:EQUINOX_COSMOS_CONTAINER="app" # or use -c + $env:EQUINOX_COSMOS_VIEWS="app-views" # or use -c 1. Use the `eqx` tool to initialize a CosmosDb container @@ -27,11 +28,11 @@ This project was generated using: # default name is "($EQUINOX_COSMOS_CONTAINER)-aux" propulsion init -ru 400 cosmos -3. To run an instance of the Reactor from a CosmosDb ChangeFeed +3. To run an instance of the Indexer from a CosmosDb ChangeFeed # `-g default` defines the `processorName` - each processor group name has separated state in the leases store - # `-c MyContainer` specifies the source Container to monitor (if you have specified EQUINOX_COSMOS_* environment + # `-c app` specifies the source Container to monitor (if you have specified EQUINOX_COSMOS_* environment # vars, no connection/database/container arguments are needed.) # For this template, this same container is also used to wire up the Connection used for the Reactions processing. # See the `proReactor` template for a more complex variant that lets you specify them separately. - dotnet run -- -g default cosmos -c MyContainer + dotnet run --project Indexer -- -g default cosmos -c app diff --git a/propulsion-indexer/propulsion-indexer.sln b/propulsion-indexer/propulsion-indexer.sln new file mode 100644 index 000000000..6eab0dcd5 --- /dev/null +++ b/propulsion-indexer/propulsion-indexer.sln @@ -0,0 +1,34 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.31903.59 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "Domain\Domain.fsproj", "{63B428A6-2353-4062-B122-CC8D09E3817D}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "App", "App\App.fsproj", "{24BEC43D-EB74-4E09-8752-A807E04CC025}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Indexer", "Indexer\Indexer.fsproj", "{5B884D74-1177-4FB2-9F17-DAF50B6063F0}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {63B428A6-2353-4062-B122-CC8D09E3817D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {63B428A6-2353-4062-B122-CC8D09E3817D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {63B428A6-2353-4062-B122-CC8D09E3817D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {63B428A6-2353-4062-B122-CC8D09E3817D}.Release|Any CPU.Build.0 = Release|Any CPU + {24BEC43D-EB74-4E09-8752-A807E04CC025}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {24BEC43D-EB74-4E09-8752-A807E04CC025}.Debug|Any CPU.Build.0 = Debug|Any CPU + {24BEC43D-EB74-4E09-8752-A807E04CC025}.Release|Any CPU.ActiveCfg = Release|Any CPU + {24BEC43D-EB74-4E09-8752-A807E04CC025}.Release|Any CPU.Build.0 = Release|Any CPU + {5B884D74-1177-4FB2-9F17-DAF50B6063F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5B884D74-1177-4FB2-9F17-DAF50B6063F0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5B884D74-1177-4FB2-9F17-DAF50B6063F0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5B884D74-1177-4FB2-9F17-DAF50B6063F0}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/propulsion-projector/Store.fs b/propulsion-projector/Store.fs index 2ba04ec18..3f6cec893 100644 --- a/propulsion-projector/Store.fs +++ b/propulsion-projector/Store.fs @@ -35,18 +35,6 @@ module Dynamo = let accessStrategy = Equinox.DynamoStore.AccessStrategy.RollingState toSnapshot createCached name codec initial fold accessStrategy (context, cache) -module Esdb = - - let create name codec initial fold (context, cache) = - let accessStrategy = Equinox.EventStoreDb.AccessStrategy.Unoptimized - Equinox.EventStoreDb.EventStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache) - -module Sss = - - let create name codec initial fold (context, cache) = - let accessStrategy = Equinox.SqlStreamStore.AccessStrategy.Unoptimized - Equinox.SqlStreamStore.SqlStreamStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache) - #if esdb [] type Config = diff --git a/propulsion-reactor/Todo.fs b/propulsion-reactor/Todo.fs index b2806d556..62437696b 100644 --- a/propulsion-reactor/Todo.fs +++ b/propulsion-reactor/Todo.fs @@ -49,7 +49,7 @@ module Fold = type State = { items: Events.ItemData list; nextId: int } /// State implied by the absence of any events on this stream let initial = { items = []; nextId = 0 } - /// Compute State change implied by a giveC:\Users\f0f00db\Projects\dotnet-templates\propulsion-summary-projector\Todo.fsn Event + /// Compute State change implied by a given Event let evolve s = function | Events.Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 } | Events.Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) } diff --git a/tests/Equinox.Templates.Tests/DotnetBuild.fs b/tests/Equinox.Templates.Tests/DotnetBuild.fs index ff760b038..a4604927a 100644 --- a/tests/Equinox.Templates.Tests/DotnetBuild.fs +++ b/tests/Equinox.Templates.Tests/DotnetBuild.fs @@ -74,7 +74,7 @@ type DotnetBuild(output: ITestOutputHelper, folder: EquinoxTemplatesFixture) = let [] proReactor args = run "proReactor" args let [] proReactorDefault () = run "proReactor" [] - let [] proCosmosReactor () = run "proCosmosReactor" [] + let [] proIndexer () = run "proIndexer" [] let [] proIndexerCdk () = run "proDynamoStoreCdk" [] let [] proHotel () = run "proHotel" []