Skip to content

Commit

Permalink
feat(eqx stats): Oldest, Newest
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 28, 2024
1 parent 32634a0 commit c23f9e4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.CosmosStore`: Group metrics by Container Name [#449](https://github.com/jet/equinox/pull/449)
- `Equinox.Core.Batcher`: Add Settable `Linger` [#454](https://github.com/jet/equinox/pull/454)
- `Equinox.CosmosStore`: Group metrics by Category; split out `Tip` [#453](https://github.com/jet/equinox/pull/453)
- `eqx stats`: Added `-O` and `-N` to extract oldest and newest `_ts` within a store [#459](https://github.com/jet/equinox/pull/459)
- `eqx`: Added `-Q` flag to omit timestamps from console output logging [#459](https://github.com/jet/equinox/pull/459)

### Changed

- `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.0, 5.0.0)`] [#448](https://github.com/jet/equinox/pull/448)
- `Equinox.MessageDb`: Up min `Npgsql` to v `7.0.7` as `7.0.0` is on CVE blacklist

### Removed

- `eqx stats`: `-A` (all stats) is now the default unless you specify >=1 of the individual stats via `ESDNO` flags [#459](https://github.com/jet/equinox/pull/459)

### Fixed

<a name="4.0.4"></a>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour

```powershell
# run queries to determine how many streams, docs, events there are in the container
eqx -VC stats -SDEP cosmos # -P to run in parallel # -V -C to show underlying query being used
eqx -V stats -P cosmos # -P to run in parallel # -V to show underlying query being used
```

5. Use the `eqx` tool to query streams and/or snapshots in a CosmosDB store
Expand Down
5 changes: 2 additions & 3 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ module internal Sync =

module Initialization =

// Note: the Cosmos SDK does not (currently) support changing the Throughput mode of an existing Database or Container.
type [<RequireQualifiedAccess>] Throughput = Manual of rus: int | Autoscale of maxRus: int
type [<RequireQualifiedAccess>] Provisioning = Container of Throughput | Database of Throughput | Serverless
let private (|ThroughputProperties|) = function
Expand All @@ -558,7 +557,7 @@ module Initialization =
| Provisioning.Container _ | Provisioning.Serverless -> createDatabaseIfNotExists client None dName
| Provisioning.Database (ThroughputProperties tp) -> async {
let! d = createDatabaseIfNotExists client (Some tp) dName
let! _ = Async.call (fun ct -> d.ReplaceThroughputAsync(tp, cancellationToken = ct))
do! Async.call (fun ct -> d.ReplaceThroughputAsync(tp, cancellationToken = ct)) |> Async.Ignore
return d }
let private createContainerIfNotExists (d: Database) cp maybeTp = async {
let! r = Async.call (fun ct -> d.CreateContainerIfNotExistsAsync(cp, throughputProperties = Option.toObj maybeTp, cancellationToken = ct))
Expand Down Expand Up @@ -608,7 +607,7 @@ module Initialization =

let initAux (client: CosmosClient) (dName, cName) mode = async {
let! d = createOrProvisionDatabase client dName mode
return! createOrProvisionContainer d (cName, "/id", applyAuxContainerProperties) mode } // as per Cosmos team, Partition Key must be "/id"
return! createOrProvisionContainer d (cName, "/id", applyAuxContainerProperties) mode } // per Cosmos team, Partition Key must be "/id"

/// Per Container, we need to ensure the stored procedure has been created exactly once (per process lifetime)
type internal ContainerInitializerGuard(container: Container, ?initContainer: Container -> CancellationToken -> Task<unit>) =
Expand Down
80 changes: 45 additions & 35 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ let [<Literal>] appName = "equinox-tool"

[<NoEquality; NoComparison>]
type Parameters =
| [<AltCommandLine "-Q"; Unique>] Quiet
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-C"; Unique>] VerboseConsole
| [<AltCommandLine "-S"; Unique>] LocalSeq
Expand All @@ -27,17 +28,18 @@ type Parameters =
| [<CliPrefix(CliPrefix.None); Last>] Query of ParseResults<QueryParameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Quiet -> "Omit timestamps from log output"
| Verbose -> "Include low level logging regarding specific test runs."
| VerboseConsole -> "Include low level test and store actions logging in on-screen output to console."
| LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
| LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)."
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
| LoadTest _ -> "Run a load test"
| Init _ -> "Initialize Store/Container (supports `cosmos` stores; also handles RU/s provisioning adjustment)."
| InitAws _ -> "Initialize DynamoDB Table (supports `dynamo` stores; also handles RU/s provisioning adjustment)."
| InitSql _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)."
| Stats _ -> "inspect store to determine numbers of streams/documents/events and/or config (supports `cosmos` and `dynamo` stores)."
| Query _ -> "Load/Summarise streams based on Cosmos SQL Queries (supports `cosmos` only)."
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
and [<NoComparison; NoEquality; RequireSubcommand>] InitParameters =
| [<AltCommandLine "-ru"; Unique>] Rus of int
| [<AltCommandLine "-A"; Unique>] Autoscale
Expand Down Expand Up @@ -97,16 +99,18 @@ and [<NoComparison; NoEquality; RequireSubcommand>] StatsParameters =
| [<AltCommandLine "-E"; Unique>] Events
| [<AltCommandLine "-S"; Unique>] Streams
| [<AltCommandLine "-D"; Unique>] Documents
| [<AltCommandLine "-A"; Unique>] All
| [<AltCommandLine "-O"; Unique>] Oldest
| [<AltCommandLine "-N"; Unique>] Newest
| [<AltCommandLine "-P"; Unique>] Parallel
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Store.Cosmos.Parameters>
| [<CliPrefix(CliPrefix.None)>] Dynamo of ParseResults<Store.Dynamo.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Events -> "Count the number of Events in the store."
| Streams -> "Count the number of Streams in the store. (Default action if no others supplied)"
| Streams -> "Count the number of Streams in the store."
| Documents -> "Count the number of Documents in the store."
| All -> "Request all available stats (equivalent to -ESD)"
| Oldest -> "Oldest document, based on the _ts field"
| Newest -> "Newest document, based on the _ts field"
| Parallel -> "Run in Parallel (CAREFUL! can overwhelm RU allocations)."
| Cosmos _ -> "Cosmos Connection parameters."
| Dynamo _ -> "Dynamo Connection parameters."
Expand All @@ -125,7 +129,7 @@ and [<NoComparison; NoEquality; RequireSubcommand>] QueryParameters =
member a.Usage = a |> function
| StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
| CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`."
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`)."
| UnfoldName _ -> "Specify unfold Name to match against `u.c`, e.g. `Snapshotted`"
| UnfoldCriteria _ -> "Specify constraints on Unfold (reference unfold fields via `u.d.`, top level fields via `c.`), e.g. `u.d.name = \"TenantName1\"`."
| Mode _ -> "readOnly: Only read `u`nfolds, not `_etag`.\n" +
Expand All @@ -139,25 +143,25 @@ and [<NoComparison; NoEquality; RequireSubcommand>] QueryParameters =
and [<RequireQualifiedAccess>] Mode = ReadOnly | ReadWithStream | Default | Raw
and [<RequireQualifiedAccess>] Criteria = SingleStream of string | CatName of string | CatLike of string | Unfiltered
and QueryArguments(p: ParseResults<QueryParameters>) =
member val Mode = p.GetResult(Mode, if p.Contains File then Mode.Raw else Mode.Default)
member val Pretty = p.Contains QueryParameters.Pretty
member val TeeConsole = p.Contains Console
member val Mode = p.GetResult(QueryParameters.Mode, if p.Contains QueryParameters.File then Mode.Raw else Mode.Default)
member val Pretty = p.Contains QueryParameters.Pretty
member val TeeConsole = p.Contains QueryParameters.Console
member val Criteria =
match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
| Some sn, None, None -> Criteria.SingleStream sn
match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with
| Some sn, None, None -> Criteria.SingleStream sn
| Some _, Some _, _
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive"
| None, Some cn, None -> Criteria.CatName cn
| None, None, Some cl -> Criteria.CatLike cl
| None, None, None -> Criteria.Unfiltered
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
member val Filepath = p.TryGetResult File
member val UnfoldName = p.TryGetResult UnfoldName
member val UnfoldCriteria = p.TryGetResult UnfoldCriteria
member val CosmosArgs =
match p.GetSubCommand() with
| QueryParameters.Cosmos p -> Store.Cosmos.Arguments p
| x -> p.Raise $"unexpected subcommand %A{x}"
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive"
| None, Some cn, None -> Criteria.CatName cn
| None, None, Some cl -> Criteria.CatLike cl
| None, None, None -> Criteria.Unfiltered
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
member val Filepath = p.TryGetResult QueryParameters.File
member val UnfoldName = p.TryGetResult QueryParameters.UnfoldName
member val UnfoldCriteria = p.TryGetResult QueryParameters.UnfoldCriteria
member val CosmosArgs = p.GetResult QueryParameters.Cosmos |> Store.Cosmos.Arguments
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
| Store.Config.Cosmos (cc, _, _) -> cc.Container
| _ -> p.Raise "Query requires Cosmos"
member x.ConfigureStore(log: ILogger) =
let storeConfig = None, true
Store.Cosmos.config log storeConfig x.CosmosArgs
Expand Down Expand Up @@ -257,11 +261,12 @@ let dumpStats log = function
| Store.Config.Mdb _ -> Equinox.MessageDb.Log.InternalMetrics.dump log
| Store.Config.Memory _ -> ()

let createDomainLog verbose verboseConsole maybeSeqEndpoint =
let createDomainLog quiet verbose verboseConsole maybeSeqEndpoint =
let c = LoggerConfiguration()
let c = if verbose then c.MinimumLevel.Debug() else c
let c = writeToStatsSinks c
let c = let outputTemplate = "{Timestamp:T} {Level:u1} {Message:l} {Properties}{NewLine}{Exception}"
let outputTemplate = if quiet then outputTemplate.Substring(outputTemplate.IndexOf ' ' + 1) else outputTemplate
let consoleLevel = if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information
c.WriteTo.Console(consoleLevel, outputTemplate, theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
Expand Down Expand Up @@ -302,21 +307,27 @@ module CosmosStats =
let run (log : ILogger, _verboseConsole, _maybeSeq) (p : ParseResults<StatsParameters>) =
match p.GetSubCommand() with
| StatsParameters.Cosmos sp ->
let doS, doD, doE =
let all, s, d, e = p.Contains All, p.Contains StatsParameters.Streams, p.Contains Documents, p.Contains StatsParameters.Events
all || s, all || d, all || e
let doS, doD, doE, doO, doN =
let s, d, e, o, n = p.Contains StatsParameters.Streams, p.Contains Documents, p.Contains StatsParameters.Events, p.Contains Oldest, p.Contains Newest
let all = not (s || d || e || o || n)
all || s, all || d, all || e, all || o, all || n
let doS = doS || (not doD && not doE) // default to counting streams only unless otherwise specified
let inParallel = p.Contains Parallel
let connector, dName, cName = CosmosInit.connect log sp
let container = connector.CreateUninitialized().GetContainer(dName, cName)
let ops = [| if doS then "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """
if doD then "Documents", """SELECT VALUE COUNT(1) FROM c"""
if doE then "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """ |]
log.Information("Computing {measures} ({mode})", Seq.map fst ops, (if inParallel then "in parallel" else "serially"))
if doE then "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """
if doO then "Oldest", """SELECT VALUE MIN(c._ts) FROM c"""
if doN then "Newest", """SELECT VALUE MAX(c._ts) FROM c""" |]
let render = if log.IsEnabled LogEventLevel.Debug then snd else fst
log.Information("Computing {measures} ({mode})", Seq.map render ops, (if inParallel then "in parallel" else "serially"))
ops |> Seq.map (fun (name, sql) -> async {
log.Debug("Running query: {sql}", sql)
let res = container.QueryValue<int>(sql) |> Async.AwaitTaskCorrect |> Async.RunSynchronously
log.Information("{stat}: {result:N0}", name, res)})
let res = container.QueryValue<int64>(sql) |> Async.AwaitTaskCorrect |> Async.RunSynchronously
match name with
| "Oldest" | "Newest" -> log.Information("{stat,-10}: {result,13} ({d:u})", name, res, DateTime.UnixEpoch.AddSeconds(float res))
| _ -> log.Information("{stat,-10}: {result,13:N0}", name, res) })
|> if inParallel then Async.Parallel else Async.Sequential
|> Async.Ignore<unit[]>
| StatsParameters.Dynamo sp -> async {
Expand Down Expand Up @@ -504,9 +515,9 @@ module Dump =
| _ -> if doT then "n/a" else "0"
prevTs <- Some x.Timestamp
if not doC then log.Information("{i,4}@{t:u}+{d,9} {u:l} {e:l} {data:l} {meta:l}",
x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta)
x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta)
else log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}",
x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta)
x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta)
match streamBytes with ValueNone -> () | ValueSome x -> log.Information("ISyncContext.StreamEventBytes {kib:n1}KiB", float x / 1024.) }
resetStats ()
let streams = p.GetResults DumpParameters.Stream
Expand All @@ -522,9 +533,8 @@ module Dump =

type Arguments(p: ParseResults<Parameters>) =
let maybeSeq = if p.Contains LocalSeq then Some "http://localhost:5341" else None
let verbose = p.Contains Verbose
let verboseConsole = p.Contains VerboseConsole
member _.CreateDomainLog() = createDomainLog verbose verboseConsole maybeSeq
let quiet, verbose, verboseConsole = p.Contains Quiet, p.Contains Verbose, p.Contains VerboseConsole
member _.CreateDomainLog() = createDomainLog quiet verbose verboseConsole maybeSeq
member _.ExecuteSubCommand() = async {
match p.GetSubCommand() with
| Init a -> do! CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None |> Async.AwaitTaskCorrect
Expand Down

0 comments on commit c23f9e4

Please sign in to comment.