Skip to content

Commit

Permalink
Multi-event batches
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 23, 2018
1 parent a9ff593 commit 8be11e9
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 163 deletions.
11 changes: 7 additions & 4 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ and [<NoEquality; NoComparison>] CosmosArguments =
| [<AltCommandLine("-d")>] Database of string
| [<AltCommandLine("-c")>] Collection of string
| [<AltCommandLine("-rt")>] RetriesWaitTime of int
| [<AltCommandLine("-a")>] PageSize of int

| [<CliPrefix(CliPrefix.None)>] Provision of ParseResults<CosmosProvisionArguments>
| [<CliPrefix(CliPrefix.None)>] Run of ParseResults<TestArguments>
Expand All @@ -89,6 +90,7 @@ and [<NoEquality; NoComparison>] CosmosArguments =
| Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)."
| Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)"
| PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)"
| Provision _ -> "Initialize a store collection."
| Run _ -> "Run a load test."
and CosmosProvisionArguments =
Expand Down Expand Up @@ -119,7 +121,7 @@ module Cosmos =
let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems))
let createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents))

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Store =
Expand Down Expand Up @@ -306,9 +308,10 @@ let main argv =
let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test")
let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds
let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5)
log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection}. " +
let pageSize = sargs.GetResult(PageSize,1)
log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection} with page size: {pageSize}. " +
"Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}",
connUri, dbName, collName, timeout, retries, maxRetryWaitTime)
connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime)
let conn = Cosmos.connect log discovery timeout operationThrottling |> Async.RunSynchronously
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
Expand All @@ -317,7 +320,7 @@ let main argv =
Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName)
let res = runTest log conn targs
let stats =
[ "Read", RuCounterSink.Read
Expand Down
8 changes: 4 additions & 4 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ let resolveStreamGesWithCompactionSemantics gateway =
let resolveStreamGesWithoutCompactionSemantics gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithCompactionSemantics gateway =
let resolveStreamEqxWithKnownEventTypeSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create
let resolveStreamEqxWithoutCompactionSemantics gateway =
let resolveStreamEqxWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
Expand Down Expand Up @@ -63,12 +63,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithKnownEventTypeSemantics
do! act service args
}
Loading

0 comments on commit 8be11e9

Please sign in to comment.