From 086dbba8e66215b41141a75874e2aeebbdecbde3 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 23 Nov 2018 17:50:56 +0000 Subject: [PATCH] Multi-event batches --- cli/Equinox.Cli/Program.fs | 11 +- src/Equinox.Cosmos/Cosmos.fs | 198 ++++++++---------- .../CosmosCoreIntegration.fs | 58 +++-- .../JsonConverterTests.fs | 9 +- 4 files changed, 123 insertions(+), 153 deletions(-) diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 1dd951d8e..e39068d00 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -77,6 +77,7 @@ and [] CosmosArguments = | [] Database of string | [] Collection of string | [] RetriesWaitTime of int + | [] PageSize of int | [] Provision of ParseResults | [] Run of ParseResults @@ -89,6 +90,7 @@ and [] CosmosArguments = | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)." | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" + | PageSize _ -> "Specify maximum number of events to record on a page before switching to a new one (default: 1)" | Provision _ -> "Initialize a store collection." | Run _ -> "Run a load test." and CosmosProvisionArguments = @@ -119,7 +121,7 @@ module Cosmos = let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) = EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime) .Connect("equinox-cli", discovery) - let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems)) + let createGateway connection (maxItems,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems, maxEventsPerSlice=maxEvents)) [] type Store = @@ -306,9 +308,10 @@ let main argv = let collName = sargs.GetResult(Collection, defaultArg (read "EQUINOX_COSMOS_COLLECTION") "equinox-test") let timeout = sargs.GetResult(Timeout,5.) |> float |> TimeSpan.FromSeconds let (retries, maxRetryWaitTime) as operationThrottling = sargs.GetResult(Retries, 1), sargs.GetResult(RetriesWaitTime, 5) - log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection}. " + + let pageSize = sargs.GetResult(PageSize,1) + log.Information("Using CosmosDb Connection {connection} Database: {database} Collection: {collection} with page size: {pageSize}. " + "Request timeout: {timeout} with {retries} retries; throttling MaxRetryWaitTime {maxRetryWaitTime}", - connUri, dbName, collName, timeout, retries, maxRetryWaitTime) + connUri, dbName, collName, pageSize, timeout, retries, maxRetryWaitTime) let conn = Cosmos.connect log discovery timeout operationThrottling |> Async.RunSynchronously match sargs.TryGetSubCommand() with | Some (Provision args) -> @@ -317,7 +320,7 @@ let main argv = Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously 0 | Some (Run targs) -> - let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName) + let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName) let res = runTest log conn targs let stats = [ "Read", RuCounterSink.Read diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index d3b7e477d..089e7145e 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -93,7 +93,7 @@ open Newtonsoft.Json /// A 'normal' (frozen, not Pending) Batch of Events, without any Projections type [] - Event = + Batch = { /// DocDb-mandated Partition Key, must be maintained within the document /// Not actually required if running in single partition mode, but for simplicity, we always write it p: string // "{streamName}" @@ -112,7 +112,21 @@ type [] /// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key i: int64 // {index} - /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) + /// The events at this offset in the stream + e: BatchEvent[] } + /// Unless running in single partion mode (which would restrict us to 10GB per collection) + /// we need to nominate a partition key that will be in every document + static member PartitionKeyField = "p" + /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use + static member IndexedFields = [Batch.PartitionKeyField; "i"] + /// If we encounter a -1 doc, we're interested in its etag so we can re-read for one RU + member x.TryToPosition() = + if x.id <> WipBatch.WellKnownDocumentId then None + else Some { index = (let ``x.e.LongLength`` = 1L in x.i+``x.e.LongLength``); etag = match x._etag with null -> None | x -> Some x } +/// A single event from the array held in a batch +and [] + BatchEvent = + { /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) c: System.DateTimeOffset // ISO 8601 /// The Event Type, used to drive deserialization @@ -126,24 +140,14 @@ type [] [)>] [] m: byte[] } // optional - /// Unless running in single partion mode (which would restrict us to 10GB per collection) - /// we need to nominate a partition key that will be in every document - static member PartitionKeyField = "p" - /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use - static member IndexedFields = [Event.PartitionKeyField; "i"] - /// If we encounter a -1 doc, we're interested in its etag so we can re-read for one RU - member x.TryToPosition() = - if x.id <> WipBatch.WellKnownDocumentId then None - else Some { index = (let ``x.e.LongLength`` = 1L in x.i+``x.e.LongLength``); etag = match x._etag with null -> None | x -> Some x } /// The Special 'Pending' Batch Format /// NB this Type does double duty as /// a) transport for when we read it -/// b) a way of encoding a batch that the stored procedure will write in to the actual document +/// b) a way of encoding a batch that the stored procedure will write in to the actual document (`i` is -1 until Stored Proc computes it) /// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch -/// a) `id` and `i` = `-1` as WIP document currently always is -/// b) events are retained as in an `e` array, not top level fields -/// c) contains projections (`c`) +/// a) `id` = `-1` +/// b) contains projections (`c`) and [] WipBatch = { /// Partition key, as per Batch @@ -158,7 +162,7 @@ and [] _etag: string /// base 'i' value for the Events held herein - _i: int64 + i: int64 /// Events e: BatchEvent[] @@ -168,24 +172,7 @@ and [] /// arguably this should be a high nember to reflect fact it is the freshest ? static member WellKnownDocumentId = "-1" /// Create Position from [Wip]Batch record context (facilitating 1 RU reads) - member x.ToPosition() = { index = x._i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x } -/// A single event from the array held in a batch -and [] - BatchEvent = - { /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) - c: System.DateTimeOffset // ISO 8601 - - /// The Event Type, used to drive deserialization - t: string // required - - /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb - [)>] - d: byte[] // required - - /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) - [)>] - [] - m: byte[] } // optional + member x.ToPosition() = { index = x.i+x.e.LongLength; etag = match x._etag with null -> None | x -> Some x } /// Projection based on the state at a given point in time `i` and Projection = { /// Base: Max index rolled into this projection @@ -210,7 +197,7 @@ type Enum() = static member Events (b:WipBatch) = b.e |> Seq.mapi (fun offset x -> { new IOrderedEvent with - member __.Index = b._i + int64 offset + member __.Index = b.i + int64 offset member __.IsProjection = false member __.EventType = x.t member __.Data = x.d @@ -223,14 +210,8 @@ type Enum() = member __.EventType = x.t member __.Data = x.d member __.Meta = x.m }) - static member Event (x:Event) = - Seq.singleton - { new IOrderedEvent with - member __.Index = x.i - member __.IsProjection = false - member __.EventType = x.t - member __.Data = x.d - member __.Meta = x.m } + static member Events (b:Batch) = + Enum.Events (b.i, b.e) static member Projections (xs: Projection[]) = seq { for x in xs -> { new IOrderedEvent with member __.Index = x.i @@ -239,7 +220,10 @@ type Enum() = member __.Data = x.d member __.Meta = x.m } } static member EventsAndProjections (x:WipBatch): IOrderedEvent seq = - Enum.Projections x.c + Enum.Events x + |> Seq.append (Enum.Projections x.c) + // where Index is equal, projections get delivered after the events so the fold semantics can be 'idempotent' + |> Seq.sortBy (fun x -> x.Index, x.IsProjection) /// Reference to Collection and name that will be used as the location for the stream type [] CollectionStream = { collectionUri: System.Uri; name: string } with @@ -348,14 +332,14 @@ module Sync = // NB don't nest in a private module, or serialization will fail miserably ;) [] type SyncResponse = { etag: string; nextI: int64; conflicts: BatchEvent[] } - let [] sprocName = "EquinoxSync-SingleEvents-021" // NB need to renumber for any breaking change + let [] sprocName = "EquinoxSync001" // NB need to renumber for any breaking change let [] sprocBody = """ // Manages the merging of the supplied Request Batch, fulfilling one of the following end-states // 1 Verify no current WIP batch, the incoming `req` becomes the WIP batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO) // 2 Current WIP batch has space to accommodate the incoming projections (req.c) and events (req.e) - merge them in, replacing any superseded projections // 3. Current WIP batch would become too large - remove WIP state from active document by replacing the well known id with a correct one; proceed as per 1 -function sync(req, expectedVersion) { +function sync(req, expectedVersion, maxEvents) { if (!req) throw new Error("Missing req argument"); const collection = getContext().getCollection(); const collectionLink = collection.getSelfLink(); @@ -363,18 +347,18 @@ function sync(req, expectedVersion) { // Locate the WIP (-1) batch (which may not exist) const wipDocId = collection.getAltLink() + "/docs/" + req.id; - const isAccepted = collection.readDocument(wipDocId, {}, function (err, current) { + const isAccepted = collection.readDocument(wipDocId, {}, function (err, current, options) { // Verify we dont have a conflicting write if (expectedVersion === -1) { executeUpsert(current); } else if (!current && expectedVersion !== 0) { // If there is no WIP page, the writer has no possible reason for writing at an index other than zero response.setBody({ etag: null, nextI: 0, conflicts: [] }); - } else if (current && expectedVersion !== current._i + current.e.length) { + } else if (current && expectedVersion !== current.i + current.e.length) { // Where possible, we extract conflicting events from e and/or c in order to avoid another read cycle // yielding [] triggers the client to go loading the events itself - const conflicts = expectedVersion < current._i ? [] : current.e.slice(expectedVersion - current._i); - const nextI = current._i + current.e.length; + const conflicts = expectedVersion < current.i ? [] : current.e.slice(expectedVersion - current.i); + const nextI = current.i + current.e.length; response.setBody({ etag: current._etag, nextI: nextI, conflicts: conflicts }); } else { executeUpsert(current); @@ -385,18 +369,27 @@ function sync(req, expectedVersion) { function executeUpsert(current) { function callback(err, doc) { if (err) throw err; - response.setBody({ etag: doc._etag, nextI: doc._i + doc.e.length, conflicts: null }); + response.setBody({ etag: doc._etag, nextI: doc.i + doc.e.length, conflicts: null }); } - // If we have hit a sensible limit for a slice in the WIP document, trim the events - if (current && current.e.length + req.e.length > 10) { - current._i = current._i + current.e.length; - current.e = req.e; - current.c = req.c; + // If we have hit a sensible limit for a slice, swap to a new one + if (current && current.e.length + req.e.length > maxEvents) { + // remove the well-known `id` value identifying the batch as being WIP + current.id = current.i.toString(); + // ... As it's no longer a WIP batch, we definitely don't want projections taking up space + delete current.c; + + // TODO Carry forward: + // - `c` items not present in `batch`, + // - their associated `c` items with `x:true` + // - any required `e` items from the page being superseded (as `c` items with `x:true`]) // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place - finalize(current); - const isAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); - if (!isAccepted) throw new Error("Unable to restart WIP batch."); + const wipUpdateAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); + if (!wipUpdateAccepted) throw new Error("Unable to remove WIP markings from WIP batch."); + + req.i = current.i + current.e.length + const isAccepted = collection.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback); + if (!isAccepted) throw new Error("Unable to create WIP batch."); } else if (current) { // Append the new events into the current batch Array.prototype.push.apply(current.e, req.e); @@ -405,39 +398,13 @@ function sync(req, expectedVersion) { // TODO: should remove only projections being superseded // as we've mutated the document in a manner that can conflict with other writers, out write needs to be contingent on no competing updates having taken place - finalize(current); const isAccepted = collection.replaceDocument(current._self, current, { etag: current._etag }, callback); if (!isAccepted) throw new Error("Unable to replace WIP batch."); } else { - current = req; - current._i = 0; - // concurrency control is by virtue of fact that any conflicting writer will encounter a primary key violation (which will result in a retry) - finalize(current); - const isAccepted = collection.createDocument(collectionLink, current, { disableAutomaticIdGeneration: true }, callback); + req.i = 0 + const isAccepted = collection.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback); if (!isAccepted) throw new Error("Unable to create WIP batch."); } - for (i = 0; i < req.e.length; i++) { - const e = req.e[i]; - const eventI = current._i + current.e.length - req.e.length + i; - const doc = { - p: req.p, - id: eventI.toString(), - i: eventI, - c: e.c, - t: e.t, - d: e.d, - m: e.m - }; - const isAccepted = collection.createDocument(collectionLink, doc, function (err) { - if (err) throw err; - }); - if (!isAccepted) throw new Error("Unable to add event " + doc.i); - } - } - - function finalize(current) { - current.i = -1; - current.id = current.i.toString(); } }""" @@ -447,14 +414,14 @@ function sync(req, expectedVersion) { | Conflict of Position * events: IOrderedEvent[] | ConflictUnknown of Position - let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: WipBatch) + let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: WipBatch, maxEvents: int) : Async = async { let sprocLink = sprintf "%O/sprocs/%s" stream.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(stream.name)) let! ct = Async.CancellationToken let ev = match expectedVersion with Some ev -> Position.FromI ev | None -> Position.FromAppendAtEnd let! (res : Client.StoredProcedureResponse) = - client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box req, box ev.index) |> Async.AwaitTaskCorrect + client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box req, box ev.index, box maxEvents) |> Async.AwaitTaskCorrect let newPos = { index = res.Response.nextI; etag = Option.ofObj res.Response.etag } return res.RequestCharge, res.Response.conflicts |> function @@ -463,7 +430,7 @@ function sync(req, expectedVersion) { | [||] -> Result.ConflictUnknown newPos | xs -> Result.Conflict (newPos, Enum.Events (ev.index, xs) |> Array.ofSeq) } - let private logged client (stream: CollectionStream) (expectedVersion, req: WipBatch) (log : ILogger) + let private logged client (stream: CollectionStream) (expectedVersion, req: WipBatch, maxEvents) (log : ILogger) : Async = async { let verbose = log.IsEnabled Events.LogEventLevel.Debug let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataProjections req.c else log @@ -472,7 +439,7 @@ function sync(req, expectedVersion) { let writeLog = log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion |> Log.prop "count" req.e.Length |> Log.prop "pcount" req.c.Length - let! t, (ru,result) = run client stream (expectedVersion, req) |> Stopwatch.Time + let! t, (ru,result) = run client stream (expectedVersion, req, maxEvents) |> Stopwatch.Time let resultLog = let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } let logConflict () = writeLog.Information("Eqx TrySync Conflict writing {eventTypes}", [| for x in req.e -> x.t |]) @@ -493,7 +460,7 @@ function sync(req, expectedVersion) { let call = logged client pk batch Log.withLoggedRetries retryPolicy "writeAttempt" call log let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) projections: WipBatch = - { p = stream.name; id = Store.WipBatch.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null + { p = stream.name; id = Store.WipBatch.WellKnownDocumentId; i = -1L(*Server-managed*); _etag = null e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |] c = Array.ofSeq projections } let mkProjections baseIndex (projectionEvents: IEvent seq) : Store.Projection seq = @@ -508,7 +475,7 @@ function sync(req, expectedVersion) { let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async { let pkd = PartitionKeyDefinition() - pkd.Paths.Add(sprintf "/%s" Store.Event.PartitionKeyField) + pkd.Paths.Add(sprintf "/%s" Store.Batch.PartitionKeyField) let colld = DocumentCollection(Id = collName, PartitionKey = pkd) colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent @@ -517,7 +484,7 @@ function sync(req, expectedVersion) { // Given how long and variable the blacklist would be, we whitelist instead colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|] // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors - colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Event.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |] + colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |] let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect return coll.Resource.Id } @@ -572,35 +539,35 @@ module private Index = let private mkQuery (client : IDocumentClient) maxItems (stream: CollectionStream) (direction: Direction) (startPos: Position option) = let querySpec = match startPos with - | None -> SqlQuerySpec("SELECT * FROM c WHERE c.i!=-1 ORDER BY c.i " + if direction = Direction.Forward then "ASC" else "DESC") + | None -> SqlQuerySpec("SELECT * FROM c ORDER BY c.i " + if direction = Direction.Forward then "ASC" else "DESC") | Some p -> let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC" - SqlQuerySpec("SELECT * FROM c WHERE c.i != -1 AND " + f, SqlParameterCollection [SqlParameter("@id", p.index)]) + SqlQuerySpec("SELECT * FROM c WHERE " + f, SqlParameterCollection [SqlParameter("@id", p.index)]) let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems) - client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() + client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() // Unrolls the Batches in a response - note when reading backawards, the events are emitted in reverse order of index - let private handleSlice direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) + let private handleSlice direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) : Async = async { let! ct = Async.CancellationToken - let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time + let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, ru = Array.ofSeq res, res.RequestCharge - let events = batches |> Seq.collect Enum.Event |> Array.ofSeq + let events = batches |> Seq.collect Enum.Events |> Array.ofSeq + if direction = Direction.Backward then Array.Reverse events // NB no Seq.rev in old FSharp.Core let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } - // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses - let log = if batches.Length = 0 && count = 0 && ru = 0. then log else let evt = Log.Slice (direction, reqMetric) in log |> Log.event evt + let evt = Log.Slice (direction, reqMetric) let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i }) - (log |> Log.prop "startIndex" (match startPos with Some { index = i } -> Nullable i | _ -> Nullable()) |> Log.prop "bytes" bytes) + (log |> Log.prop "startIndex" (match startPos with Some { index = i } -> Nullable i | _ -> Nullable()) |> Log.prop "bytes" bytes |> Log.event evt) .Information("Eqx {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}", "Query", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition()) return events, maybePosition, ru } - let private runQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) + let private runQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) (maxPermittedBatchReads: int option) - (query: IDocumentQuery) + (query: IDocumentQuery) : AsyncSeq = let rec loop batchCount : AsyncSeq = asyncSeq { match maxPermittedBatchReads with @@ -618,9 +585,8 @@ module private Index = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let action = match direction with Direction.Forward -> "LoadF" | Direction.Backward -> "LoadB" - // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses - let log = if count = 0 && ru = 0. then log else let evt = Log.Event.Batch (direction, responsesCount, reqMetric) in log |> Log.event evt - (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize).Information( + let evt = Log.Event.Batch (direction, responsesCount, reqMetric) + (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize |> Log.event evt).Information( "Eqx {action:l} {stream} v{nextI} {count}/{responses} {ms}ms rc={ru}", action, streamName, nextI, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) @@ -721,19 +687,23 @@ type EqxConnection(client: IDocumentClient, ?readRetryPolicy (*: (int -> Async<' member __.ReadRetryPolicy = readRetryPolicy member __.WriteRetryPolicy = writeRetryPolicy -/// Defines the policies in force regarding how to constrain query responses +/// Defines the policies in force regarding how to a) split up calls b) limit the number of events per slice type EqxBatchingPolicy ( // Max items to request in query response. Defaults to 10. ?defaultMaxItems : int, // Dynamic version of `defaultMaxItems`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxItems` ?getDefaultMaxItems : unit -> int, /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited. - ?maxRequests) = + ?maxRequests, + /// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events. Defaults to 10. + ?maxEventsPerSlice) = let getdefaultMaxItems = defaultArg getDefaultMaxItems (fun () -> defaultArg defaultMaxItems 10) /// Limit for Maximum number of `Batch` records in a single query batch response member __.MaxItems = getdefaultMaxItems () /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices` member __.MaxRequests = maxRequests + /// Maximum number of events to accumulate within the `WipBatch` before switching to a new one when adding Events + member __.MaxEventsPerSlice = defaultArg maxEventsPerSlice 10 type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = let eventTypesPredicate resolved = @@ -781,7 +751,7 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = let! res = __.Read log None stream Direction.Forward (Some pos) (fun _ -> false) return LoadFromTokenResult.Found res } member __.Sync log stream (expectedVersion, batch: Store.WipBatch): Async = async { - let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch) + let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch,batching.MaxEventsPerSlice) match wr with | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (Token.create stream pos',events) | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') @@ -1060,9 +1030,13 @@ type EqxContext /// Defaults to 10 ?defaultMaxItems, /// Alternate way of specifying defaultMaxItems which facilitates reading it from a cached dynamic configuration - ?getDefaultMaxItems) = + ?getDefaultMaxItems, + /// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered. + /// Defaults to 100 + ?maxEventsPerSlice) = let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10 let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems) + let batching = EqxBatchingPolicy(getDefaultMaxItems=getDefaultMaxItems, maxEventsPerSlice=defaultArg maxEventsPerSlice 100) let gateway = EqxGateway(conn, batching) let maxCountPredicate count = diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 6247d5d12..a4eb0cc67 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -32,7 +32,7 @@ type Tests(testOutputHelper) = incr testIterations sprintf "events-%O-%i" name !testIterations let mkContextWithItemLimit conn defaultBatchSize = - EqxContext(conn,collections,log,?defaultMaxItems=defaultBatchSize) + EqxContext(conn,collections,log,?defaultMaxItems=defaultBatchSize,maxEventsPerSlice=10) let mkContext conn = mkContextWithItemLimit conn None let verifyRequestChargesMax rus = @@ -48,7 +48,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName index <| EventData.Create(0,1) test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 14 // observed 12.03 // was 10 + verifyRequestChargesMax 10 // Clear the counters capture.Clear() @@ -107,28 +107,25 @@ type Tests(testOutputHelper) = capture.Clear() let mutable pos = 0L - let ae = false // TODO fix bug for appendBatchSize in [4; 5; 9] do - if ae then - let! res = Events.appendAtEnd ctx streamName <| EventData.Create (int pos,appendBatchSize) - pos <- pos + int64 appendBatchSize - //let! res = Events.append ctx streamName pos (Array.replicate appendBatchSize event) - test <@ [EqxAct.Append] = capture.ExternalCalls @> - pos =! res - else - let! res = Events.append ctx streamName pos <| EventData.Create (int pos,appendBatchSize) - pos <- pos + int64 appendBatchSize - //let! res = Events.append ctx streamName pos (Array.replicate appendBatchSize event) - test <@ [EqxAct.Append] = capture.ExternalCalls @> - AppendResult.Ok pos =! res - verifyRequestChargesMax 50 // was 20, observed 41.64 // 15.59 observed + let! res = Events.appendAtEnd ctx streamName <| EventData.Create (int pos,appendBatchSize) + test <@ [EqxAct.Append] = capture.ExternalCalls @> + pos <- pos + int64 appendBatchSize + pos =! res + verifyRequestChargesMax 20 // 15.59 observed + capture.Clear() + + let! res = Events.getNextIndex ctx streamName + test <@ [EqxAct.Index] = capture.ExternalCalls @> + verifyRequestChargesMax 2 + pos =! res capture.Clear() let! res = Events.appendAtEnd ctx streamName <| EventData.Create (int pos,42) pos <- pos + 42L pos =! res test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 180 // observed 167.32 // was 20 + verifyRequestChargesMax 20 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -140,11 +137,10 @@ type Tests(testOutputHelper) = // Demonstrate benefit/mechanism for using the Position-based API to avail of the etag tracking let stream = ctx.CreateStream streamName - let max = 2000 // observed to time out server side // WAS 5000 - let extrasCount = match extras with x when x * 100 > max -> max | x when x < 1 -> 1 | x -> x*100 + let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 let! _pos = ctx.NonIdempotentAppend(stream, EventData.Create (int pos,extrasCount)) test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 7000 // 6867.7 observed // was 300 // 278 observed + verifyRequestChargesMax 300 // 278 observed capture.Clear() let! pos = ctx.Sync(stream,?position=None) @@ -176,7 +172,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 12 // was 10, observed 10.57 + verifyRequestChargesMax 10 capture.Clear() // Try overwriting it (a competing consumer would see the same) @@ -186,7 +182,7 @@ type Tests(testOutputHelper) = | AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e | x -> x |> failwithf "Unexpected %A" test <@ [EqxAct.Resync] = capture.ExternalCalls @> - verifyRequestChargesMax 5 // observed 4.21 // was 4 + verifyRequestChargesMax 4 capture.Clear() } @@ -206,7 +202,7 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res test <@ List.replicate 2 EqxAct.SliceForward @ [EqxAct.BatchForward] = capture.ExternalCalls @> - verifyRequestChargesMax 8 // observed 6.14 // was 3 + verifyRequestChargesMax 3 } [] @@ -221,9 +217,9 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res - // 2 items atm + // 2 Slices this time test <@ [EqxAct.SliceForward; EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @> - verifyRequestChargesMax 7 // observed 6.14 // was 6 + verifyRequestChargesMax 6 } [] @@ -240,7 +236,7 @@ type Tests(testOutputHelper) = // TODO [implement and] prove laziness test <@ List.replicate 2 EqxAct.SliceForward @ [EqxAct.BatchForward] = capture.ExternalCalls @> - verifyRequestChargesMax 10 // observed 8.99 // was 3 + verifyRequestChargesMax 3 } (* Backward *) @@ -248,19 +244,19 @@ type Tests(testOutputHelper) = [] let getBackwards (TestStream streamName) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log - let ctx = mkContextWithItemLimit conn (Some 2) + let ctx = mkContextWithItemLimit conn (Some 1) let! expected = add6EventsIn2Batches ctx streamName // We want to skip reading the last - let expected = Array.take 5 expected + let expected = Array.take 5 expected |> Array.tail - let! res = Events.getBackwards ctx streamName 4L 5 + let! res = Events.getBackwards ctx streamName 4L 4 verifyCorrectEventsBackward 4L expected res - test <@ List.replicate 3 EqxAct.SliceBackward @ [EqxAct.BatchBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 10 // observed 8.98 // was 3 + test <@ List.replicate 2 EqxAct.SliceBackward @ [EqxAct.BatchBackward] = capture.ExternalCalls @> + verifyRequestChargesMax 3 } // TODO AsyncSeq version diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index 06b795722..fd685af60 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -21,12 +21,9 @@ type VerbatimUtf8Tests() = [] let ``encodes correctly`` () = let encoded = mkUnionEncoder().Encode(A { embed = "\"" }) - let e : Store.Event = - { p = "streamName"; id = string 0; i = 0L; _etag=null - c = DateTimeOffset.MinValue - t = encoded.caseName - d = encoded.payload - m = null } + let e : Store.Batch = + { p = "streamName"; id = string 0; i = 0L; _etag = null + e = [| { c = DateTimeOffset.MinValue; t = encoded.caseName; d = encoded.payload; m = null } |] } let res = JsonConvert.SerializeObject(e) test <@ res.Contains """"d":{"embed":"\""}""" @>