Skip to content

Commit

Permalink
Cosmos core events API (#49), remove rolling snapshots
Browse files Browse the repository at this point in the history
* Reorganize, adding Batch structure
* Rework stored procedure
* Remove rolling snapshots
* Add explicit non-indexed mode
  • Loading branch information
bartelink committed Nov 26, 2018
1 parent 56ba4b9 commit 5ca1d1f
Show file tree
Hide file tree
Showing 17 changed files with 1,553 additions and 970 deletions.
89 changes: 56 additions & 33 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

open Argu
open Domain
open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.EventStore
open Infrastructure
open Serilog
Expand Down Expand Up @@ -119,7 +119,7 @@ module Cosmos =
let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let createGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(maxBatchSize = batchSize))
let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems))

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Store =
Expand Down Expand Up @@ -147,22 +147,19 @@ module Test =
else None
let eqxCache =
if targs.Contains Cached then
let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create
| Store.Cosmos (gateway, databaseId, connectionId) ->
if targs.Contains Indexed then
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = cache)
.Create(databaseId, connectionId, streamName)
else
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots snapshot, ?caching = cache)
.Create(databaseId, connectionId, streamName)
let store = EqxStore(gateway, EqxCollections(databaseId, connectionId))
if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection snapshot, ?caching = eqxCache).Create
else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand All @@ -172,34 +169,41 @@ module Test =

[<AutoOpen>]
module SerilogHelpers =
let (|CosmosReadRu|CosmosWriteRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
let inline (|Stats|) ({ interval = i; ru = ru }: Equinox.Cosmos.Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds
let (|CosmosReadRu|CosmosWriteRu|CosmosResyncRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
match evt with
| Equinox.Cosmos.Log.Index { ru = ru }
| Equinox.Cosmos.Log.IndexNotFound { ru = ru }
| Equinox.Cosmos.Log.IndexNotModified { ru = ru }
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Forward,_, { ru = ru })
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_, { ru = ru }) -> CosmosReadRu ru
| Equinox.Cosmos.Log.WriteSuccess {ru = ru }
| Equinox.Cosmos.Log.WriteConflict {ru = ru } -> CosmosWriteRu ru
| Equinox.Cosmos.Log.Index (Stats s)
| Equinox.Cosmos.Log.IndexNotFound (Stats s)
| Equinox.Cosmos.Log.IndexNotModified (Stats s)
| Equinox.Cosmos.Log.Batch (_,_, (Stats s)) -> CosmosReadRu s
| Equinox.Cosmos.Log.WriteSuccess (Stats s)
| Equinox.Cosmos.Log.WriteConflict (Stats s) -> CosmosWriteRu s
| Equinox.Cosmos.Log.WriteResync (Stats s) -> CosmosResyncRu s
// slices are rolled up into batches so be sure not to double-count
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Direction.Forward,{ ru = ru })
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Direction.Backward,{ ru = ru }) -> CosmosSliceRu ru
| Equinox.Cosmos.Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru
let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
let (|CosmosMetric|_|) (logEvent : LogEvent) : Equinox.Cosmos.Log.Event option =
match logEvent.Properties.TryGetValue("cosmosEvt") with
| true, SerilogScalar (:? Equinox.Cosmos.Log.Event as e) -> Some e
| _ -> None
type RuCounter =
{ mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member __.Ingest (ru, ms) =
Interlocked.Increment(&__.count) |> ignore
Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
Interlocked.Add(&__.ms, ms) |> ignore
type RuCounterSink() =
static let mutable readX10 = 0L
static let mutable writeX10 = 0L
static member Read = readX10 / 10L
static member Write = writeX10 / 10L
static member val Read = RuCounter.Create()
static member val Write = RuCounter.Create()
static member val Resync = RuCounter.Create()
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| CosmosMetric (CosmosReadRu ru) -> Interlocked.Add(&readX10, int64 (ru*10.)) |> ignore
| CosmosMetric (CosmosWriteRu ru) -> Interlocked.Add(&writeX10, int64 (ru*10.)) |> ignore
| CosmosMetric (CosmosReadRu stats) -> RuCounterSink.Read.Ingest stats
| CosmosMetric (CosmosWriteRu stats) -> RuCounterSink.Write.Ingest stats
| CosmosMetric (CosmosResyncRu stats) -> RuCounterSink.Resync.Ingest stats
| _ -> ()

let createStoreLog verbose verboseConsole maybeSeqEndpoint =
Expand Down Expand Up @@ -260,7 +264,7 @@ let main argv =
let resultFile = createResultLog report
for r in results do
resultFile.Information("Aggregate: {aggregate}", r)
log.Information("Run completed; Current memory allocation: {bytes:n0}", GC.GetTotalMemory(true))
log.Information("Run completed; Current memory allocation: {bytes:n2}MB", (GC.GetTotalMemory(true) |> float) / 1024./1024.)
0

match args.GetSubCommand() with
Expand Down Expand Up @@ -309,15 +313,34 @@ let main argv =
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
let rus = args.GetResult(Rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus)
Equinox.Cosmos.Initialization.initialize conn.Client dbName collName rus |> Async.RunSynchronously
log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus)
Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let res = runTest log conn targs
let read, write = RuCounterSink.Read, RuCounterSink.Write
let total = read+write
log.Information("Total Request Charges sustained in test: {totalRus:n0} (R:{readRus:n0}, W:{writeRus:n0})", total, read, write)
let stats =
[ "Read", RuCounterSink.Read
"Write", RuCounterSink.Write
"Resync", RuCounterSink.Resync ]
let mutable totalCount, totalRc, totalMs = 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count))
for name, stat in stats do
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalRc <- totalRc + ru
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
logActivity "TOTAL" totalCount totalRc totalMs
let measures : (string * (TimeSpan -> float)) list =
[ "s", fun x -> x.TotalSeconds
"m", fun x -> x.TotalMinutes
"h", fun x -> x.TotalHours ]
let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru)
let duration = targs.GetResult(DurationM,1.) |> TimeSpan.FromMinutes
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d)
res
| _ -> failwith "init or run is required"
| _ -> failwith "ERROR: please specify memory, es or cosmos Store"
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Folds =
let toSnapshot (s: State) : Events.Compaction.State =
{ items = [| for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } |] }
let ofCompacted (s: Events.Compaction.State) : State =
{ items = [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
{ items = if s.items = null then [] else [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
let initial = { items = [] }
let evolve (state : State) event =
let updateItems f = { state with items = f state.items }
Expand Down
4 changes: 3 additions & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Events =
type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool }
type Value = { email : string; preferences : Preferences }

let [<Literal>] EventTypeName = "contactPreferencesChanged"
type Event =
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
| [<System.Runtime.Serialization.DataMember(Name = EventTypeName)>]Updated of Value
interface TypeShape.UnionContract.IUnionContract
let eventTypeNames = System.Collections.Generic.HashSet<string>([EventTypeName])

module Folds =
type State = Events.Preferences
Expand Down
18 changes: 9 additions & 9 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.CartIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -22,10 +22,10 @@ let resolveGesStreamWithRollingSnapshots gateway =
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveEqxStreamWithProjection gateway =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection snapshot).Create
let resolveEqxStreamWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -71,13 +71,13 @@ type Tests(testOutputHelper) =
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithoutCompactionSemantics
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithCompactionEventType
let ``Can roundtrip against Cosmos, correctly folding the events with With Projection`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithProjection
do! act service args
}
14 changes: 7 additions & 7 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.ContactPreferencesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -21,10 +21,10 @@ let resolveStreamGesWithOptimizedStorageSemantics gateway =
let resolveStreamGesWithoutAccessStrategy gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create
let resolveStreamEqxWithoutCompactionSemantics gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -63,12 +63,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics
do! act service args
}
6 changes: 3 additions & 3 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.FavoritesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -21,7 +21,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceEqx gateway log =
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down Expand Up @@ -58,7 +58,7 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createEqxGateway conn defaultBatchSize
let gateway = createEqxStore conn defaultBatchSize
let service = createServiceEqx gateway log
do! act service args
}
18 changes: 10 additions & 8 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ module EquinoxCosmosInterop =
let action, metric, batches, ru =
match evt with
| Log.WriteSuccess m -> "EqxAppendToStreamAsync", m, None, m.ru
| Log.WriteConflict m -> "EqxAppendToStreamAsync", m, None, m.ru
| Log.WriteConflict m -> "EqxAppendToStreamConflictAsync", m, None, m.ru
| Log.WriteResync m -> "EqxAppendToStreamResyncAsync", m, None, m.ru
| Log.Slice (Direction.Forward,m) -> "EqxReadStreamEventsForwardAsync", m, None, m.ru
| Log.Slice (Direction.Backward,m) -> "EqxReadStreamEventsBackwardAsync", m, None, m.ru
| Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru
Expand Down Expand Up @@ -117,13 +118,14 @@ type Tests() =
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, hooking, extracting and substituting metrics in the logging information`` context cartId skuId = Async.RunSynchronously <| async {
let buffer = ResizeArray<string>()
let ``Can roundtrip against Cosmos, hooking, extracting and substituting metrics in the logging information`` context skuId = Async.RunSynchronously <| async {
let batchSize = defaultBatchSize
let (log,capture) = createLoggerWithMetricsExtraction buffer.Add
let buffer = ConcurrentQueue<string>()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createEqxGateway conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithCompactionEventType gateway)
let itemCount, cartId = batchSize / 2 + 1, cartId ()
do! act buffer capture service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
let gateway = createEqxStore conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithProjection gateway)
let itemCount = batchSize / 2 + 1
let cartId = Guid.NewGuid() |> CartId
do! act buffer service itemCount context cartId skuId "Eqx Index " // one is a 404, one is a 200
}
Loading

0 comments on commit 5ca1d1f

Please sign in to comment.