From da5d45f6fe033848d6b2ee5eea235661b9bda852 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 20 Nov 2018 13:04:56 +0000 Subject: [PATCH] Add getNextIndex, flesh out and namespace APIs --- cli/Equinox.Cli/Program.fs | 10 +- samples/Store/Integration/CartIntegration.fs | 2 +- .../ContactPreferencesIntegration.fs | 2 +- .../Store/Integration/FavoritesIntegration.fs | 2 +- samples/Store/Integration/LogIntegration.fs | 1 - src/Equinox.Cosmos/Cosmos.fs | 1102 +++++++++-------- ...ntegration.fs => CosmosCoreIntegration.fs} | 112 +- .../CosmosFixtures.fs | 4 +- .../CosmosFixturesInfrastructure.fs | 37 +- .../CosmosIntegration.fs | 13 +- .../Equinox.Cosmos.Integration.fsproj | 2 +- 11 files changed, 718 insertions(+), 569 deletions(-) rename tests/Equinox.Cosmos.Integration/{CosmosEventsIntegration.fs => CosmosCoreIntegration.fs} (67%) diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 15eae10c0..af985dbc8 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -2,7 +2,7 @@ open Argu open Domain -open Equinox.Cosmos +open Equinox.Cosmos.Builder open Equinox.EventStore open Infrastructure open Serilog @@ -121,7 +121,7 @@ module Cosmos = let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) = EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime) .Connect("equinox-cli", discovery) - let createGateway connection (batchSize,pageSize) = EqxGateway(connection, EqxBatchingPolicy(getMaxBatchSize = (fun () -> batchSize), maxEventsPerSlice = pageSize)) + let createGateway connection (maxBatches,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxSlices=maxBatches, maxEventsPerSlice = maxEvents)) [] type Store = @@ -144,8 +144,8 @@ module Test = let createFavoritesService store (targs: ParseResults) log = let cache = if targs.Contains Cached then - let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50) - Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some + let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50) + Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some else None let resolveStream streamName = match store with @@ -315,7 +315,7 @@ let main argv = | Some (Provision args) -> let rus = args.GetResult(Rus) log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus) - Equinox.Cosmos.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously + Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously 0 | Some (Run targs) -> let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName) diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 4a0fea511..ecd69d7eb 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -1,6 +1,6 @@ module Samples.Store.Integration.CartIntegration -open Equinox.Cosmos +open Equinox.Cosmos.Builder open Equinox.Cosmos.Integration open Equinox.EventStore open Equinox.MemoryStore diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index a4ff983d1..318abb1f7 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -1,6 +1,6 @@ module Samples.Store.Integration.ContactPreferencesIntegration -open Equinox.Cosmos +open Equinox.Cosmos.Builder open Equinox.Cosmos.Integration open Equinox.EventStore open Equinox.MemoryStore diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index a7c51913b..7b21144ca 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -1,6 +1,6 @@ module Samples.Store.Integration.FavoritesIntegration -open Equinox.Cosmos +open Equinox.Cosmos.Builder open Equinox.Cosmos.Integration open Equinox.EventStore open Equinox.MemoryStore diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index c717fcdb2..b199d870e 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -21,7 +21,6 @@ module EquinoxEsInterop = { action = action; stream = metric.stream; interval = metric.interval; bytes = metric.bytes; count = metric.count; batches = batches } module EquinoxCosmosInterop = open Equinox.Cosmos - open Equinox.Cosmos.Store [] type FlatMetric = { action: string; stream: string; interval: StopwatchInterval; bytes: int; count: int; batches: int option; ru: float } with override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index a81467476..4c2b33514 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -1,270 +1,245 @@ -namespace Equinox.Cosmos +namespace Equinox.Cosmos.Internal.Json -open Equinox -open Equinox.Store -open FSharp.Control -open Microsoft.Azure.Documents +open Newtonsoft.Json.Linq open Newtonsoft.Json -open Serilog -open System -//open Faults - -[] -module Json = - open Newtonsoft.Json.Linq - - /// Manages injecting prepared json into the data being submitted to DocDb as-is, on the basis we can trust it to be valid json as DocDb will need it to be - type VerbatimUtf8JsonConverter() = - inherit JsonConverter() - - override __.ReadJson(reader, _, _, _) = - let token = JToken.Load(reader) - if token.Type = JTokenType.Object then token.ToString() |> System.Text.Encoding.UTF8.GetBytes |> box - else Array.empty |> box - - override __.CanConvert(objectType) = - typeof.Equals(objectType) - - override __.WriteJson(writer, value, serializer) = - let array = value :?> byte[] - if array = null || Array.length array = 0 then serializer.Serialize(writer, null) - else writer.WriteRawValue(System.Text.Encoding.UTF8.GetString(array)) - - open System.IO - open System.IO.Compression - /// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc - /// Only applied to snapshots in the Index - type Base64ZipUtf8JsonConverter() = - inherit JsonConverter() - let pickle (input : byte[]) : string = - if input = null then null else - - use output = new MemoryStream() - use compressor = new DeflateStream(output, CompressionLevel.Optimal) - compressor.Write(input,0,input.Length) - compressor.Close() - Convert.ToBase64String(output.ToArray()) - let unpickle str : byte[] = - if str = null then null else - - let compressedBytes = Convert.FromBase64String str - use input = new MemoryStream(compressedBytes) - use decompressor = new DeflateStream(input, CompressionMode.Decompress) - use output = new MemoryStream() - decompressor.CopyTo(output) - output.ToArray() - - override __.CanConvert(objectType) = - typeof.Equals(objectType) - override __.ReadJson(reader, _, _, serializer) = - //( if reader.TokenType = JsonToken.Null then null else - serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box - override __.WriteJson(writer, value, serializer) = - let pickled = value |> unbox |> pickle - serializer.Serialize(writer, pickled) -[] -module private DocDb = - /// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions - let (|AggregateException|) (exn : exn) = - let rec aux (e : exn) = - match e with - | :? AggregateException as agg when agg.InnerExceptions.Count = 1 -> - aux agg.InnerExceptions.[0] - | _ -> e - - aux exn - /// DocumentDB Error HttpStatusCode extractor - let (|DocDbException|_|) (e : exn) = - match e with - | AggregateException (:? DocumentClientException as dce) -> Some dce - | _ -> None - /// Map Nullable to Option - let (|HasValue|Null|) (x:Nullable<_>) = - if x.HasValue then HasValue x.Value - else Null - /// DocumentDB Error HttpStatusCode extractor - let (|DocDbStatusCode|_|) (e : DocumentClientException) = - match e.StatusCode with - | HasValue x -> Some x - | Null -> None +/// Manages injecting prepared json into the data being submitted to DocDb as-is, on the basis we can trust it to be valid json as DocDb will need it to be +type VerbatimUtf8JsonConverter() = + inherit JsonConverter() + + override __.ReadJson(reader, _, _, _) = + let token = JToken.Load(reader) + if token.Type = JTokenType.Object then token.ToString() |> System.Text.Encoding.UTF8.GetBytes |> box + else Array.empty |> box + + override __.CanConvert(objectType) = + typeof.Equals(objectType) + + override __.WriteJson(writer, value, serializer) = + let array = value :?> byte[] + if array = null || Array.length array = 0 then serializer.Serialize(writer, null) + else writer.WriteRawValue(System.Text.Encoding.UTF8.GetString(array)) + +open System.IO +open System.IO.Compression + +/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc +/// Only applied to snapshots in the Index +type Base64ZipUtf8JsonConverter() = + inherit JsonConverter() + let pickle (input : byte[]) : string = + if input = null then null else + + use output = new MemoryStream() + use compressor = new DeflateStream(output, CompressionLevel.Optimal) + compressor.Write(input,0,input.Length) + compressor.Close() + System.Convert.ToBase64String(output.ToArray()) + let unpickle str : byte[] = + if str = null then null else + + let compressedBytes = System.Convert.FromBase64String str + use input = new MemoryStream(compressedBytes) + use decompressor = new DeflateStream(input, CompressionMode.Decompress) + use output = new MemoryStream() + decompressor.CopyTo(output) + output.ToArray() + + override __.CanConvert(objectType) = + typeof.Equals(objectType) + override __.ReadJson(reader, _, _, serializer) = + //( if reader.TokenType = JsonToken.Null then null else + serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box + override __.WriteJson(writer, value, serializer) = + let pickled = value |> unbox |> pickle + serializer.Serialize(writer, pickled) + +namespace Equinox.Cosmos.Events + +open Equinox.Store.Infrastructure // Option shims for downlevel frameworks + +/// Common form for either a raw Event or a Projection +type IEvent = + /// The Event Type, used to drive deserialization + abstract member EventType : string + /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb + abstract member Data : byte[] + /// Optional metadata (null, or same as d, not written if missing) + abstract member Meta : byte[] + +/// Represents an Event or Projection and its relative position in the event sequence +type IOrderedEvent = + inherit IEvent + /// The index into the event sequence of this event + abstract member Index : int64 + /// Indicates whether this is a primary event or a projection based on the events <= up to `Index` + abstract member IsProjection: bool + +/// Position and Etag to which an operation is relative +type [] Position = { index: int64; etag: string option } with + /// Just Do It mode + static member internal FromAppendAtEnd = Position.FromIndexOnly -1L + /// Create Position from [Wip]Batch record context facilitating 1 RU reads + static member internal FromDocument(i: int64, etag: string) = { index = i; etag = Option.ofObj etag } + /// NB very inefficient compared to FromDocument or using one already returned to you + static member internal FromIndexOnly(i: int64) = { index = i; etag = None } + /// If we have strong reason to suspect a stream is empty empty, we won't have an etag (and Writer Stored Procedure special cases this) + static member FromEmptyStream = Position.FromIndexOnly 0L + +/// Reference to Storage Partition +type [] Stream = { collectionUri: System.Uri; name: string } with + static member Create(collectionUri, name) = { collectionUri = collectionUri; name = name } + +namespace Equinox.Cosmos.Store + +open Equinox.Cosmos.Events +open Newtonsoft.Json - type ReadResult<'T> = Found of 'T | NotFound | NotModified - type DocDbCollection(client : IDocumentClient, collectionUri) = - member __.TryReadDocument(documentId : string, ?options : Client.RequestOptions): Async> = async { - let! ct = Async.CancellationToken - let options = defaultArg options null - let docLink = sprintf "%O/docs/%s" collectionUri documentId - try let! document = async { return! client.ReadDocumentAsync<'T>(docLink, options = options, cancellationToken = ct) |> Async.AwaitTaskCorrect } - if document.StatusCode = System.Net.HttpStatusCode.NotModified then return document.RequestCharge, NotModified - // NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result - else return document.RequestCharge, Found document.Document - with - | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.NotFound as e) -> return e.RequestCharge, NotFound - // NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens - | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified } +/// A 'normal' (frozen, not Pending) Batch of Events, without any Projections +type [] + Batch = + { /// DocDb-mandated Partition Key, must be maintained within the document + /// Not actually required if running in single partition mode, but for simplicity, we always write it + p: string // "{streamName}" + + /// DocDb-mandated unique row key; needs to be unique within any partition it is maintained; must be string + /// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it + /// NB WipBatch uses a well known value here while it's actively 'open' + id: string // "{index}" + + /// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key + i: int64 // {index} + + /// The events at this offset in the stream + e: BatchEvent[] } + /// Unless running in single partion mode (which would restrict us to 10GB per collection) + /// we need to nominate a partition key that will be in every document + static member PartitionKeyField = "p" + /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use + static member IndexedFields = [Batch.PartitionKeyField; "i"] +/// A single event from the array held in a batch +and [] + BatchEvent = + { /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) + c: System.DateTimeOffset // ISO 8601 -module Store = - /// Reference to Storage Partition - type [] Stream = { collectionUri: Uri; name: string } with - static member Create(collectionUri, name) = { collectionUri = collectionUri; name = name } - /// Position and Etag to which an operation is relative - type [] Position = { index: int64; etag: string option } with - /// Create Position from [Wip]Batch record context facilating 1 RU reads - static member FromDocument(i: int64, etag: string) = { index = i; etag = Option.ofObj etag } - /// NB very inefficient compared to FromDocument - static member FromIndexOnly(i: int64) = { index = i; etag = None } - /// If we know the stream is empty, we won't have an etag (and Writer Stored Procedure special cases this) - static member FromEmptyStream() = Position.FromIndexOnly 0L - /// Just Do It mode - static member FromAppendAtEnd = Position.FromIndexOnly -1L - - [] - type Direction = Forward | Backward with - override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" - - /// A 'normal' (frozen, not Pending) Batch of Events, without any Projections - type [] - Batch = - { /// DocDb-mandated Partition Key, must be maintained within the document - /// Not actually required if running in single partition mode, but for simplicity, we always write it - p: string // "{streamName}" - - /// DocDb-mandated unique row key; needs to be unique within any partition it is maintained; must be string - /// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it - /// NB WipBatch uses a well known value here while it's actively 'open' - id: string // "{index}" - - /// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key - i: int64 // {index} - - /// The events at this offset in the stream - e: BatchEvent[] } - /// Unless running in single partion mode (which would restrict us to 10GB per collection) - /// we need to nominate a partition key that will be in every document - static member PartitionKeyField = "p" - /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use - static member IndexedFields = [Batch.PartitionKeyField; "i"] - /// A single event from the array held in a batch - and [] - BatchEvent = - { /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) - c: DateTimeOffset // ISO 8601 - - /// The Event Type, used to drive deserialization - t: string // required - - /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb - [)>] - d: byte[] // required - - /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) - [)>] - [] - m: byte[] } // optional - - /// The Special 'Pending' Batch Format - /// NB this Type does double duty as - /// a) transport for when we read it - /// b) a way of encoding a batch that the stored procedure will write in to the actual document - /// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch - /// a) `id` = `-1` - /// b) contains projections (`c`) - /// c) `i` is temporarily -1 and filled in by the server when this is used as the write batch request format - type [] - WipBatch = - { /// Partition key, as per Batch - p: string // "{streamName}" - /// Document Id within partition, as per Batch - id: string // "{-1}" - Well known IdConstant used while this remains the pending batch - - /// When we read, we need to capture the value so we can retain it for caching purposes - /// NB this is not relevant to fill in when we pass it to the writing stored procedure - /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - [] - _etag: string - - /// base 'i' value for the Events held herein - i: int64 - - /// Events - e: BatchEvent[] - - /// Projections - c: Projection[] } - /// arguably this should be a high nember to reflect fact it is the freshest ? - static member WellKnownDocumentId = "-1" - /// Projection based on the state at a given point in time `i` - and [] - Projection = - { /// Base: Max index rolled into this projection - i: int64 - - ///// Indicates whether this is actually an event being retained to support a lagging projection - //x: bool - - /// The Event Type of this compaction/snapshot, used to drive deserialization - t: string // required - - /// Event body - Json -> UTF-8 -> Deflate -> Base64 - [)>] - d: byte[] // required - - /// Optional metadata, same encoding as `d` (can be null; not written if missing) - [)>] - [] - m: byte[] } // optional - - /// Common form for either a raw Event or a Projection - type IEvent = /// The Event Type, used to drive deserialization - abstract member EventType : string + t: string // required + /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb - abstract member Data : byte[] - /// Optional metadata (null, or same as d, not written if missing) - abstract member Meta : byte[] - - /// Represents an Event or Projection and its relative position in the event sequence - type IOrderedEvent = - inherit IEvent - /// The index into the event sequence of this event - abstract member Index : int64 - /// Indicates whether this is a primary event or a projection based on the events <= up to `Index` - abstract member IsProjection: bool - - type Enum() = - static member Events (b:WipBatch) = - b.e |> Seq.mapi (fun offset x -> + [)>] + d: byte[] // required + + /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) + [)>] + [] + m: byte[] } // optional + +/// The Special 'Pending' Batch Format +/// NB this Type does double duty as +/// a) transport for when we read it +/// b) a way of encoding a batch that the stored procedure will write in to the actual document +/// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch +/// a) `id` = `-1` +/// b) contains projections (`c`) +/// c) `i` is temporarily -1 and filled in by the server when this is used as the write batch request format +type [] + WipBatch = + { /// Partition key, as per Batch + p: string // "{streamName}" + /// Document Id within partition, as per Batch + id: string // "{-1}" - Well known IdConstant used while this remains the pending batch + + /// When we read, we need to capture the value so we can retain it for caching purposes + /// NB this is not relevant to fill in when we pass it to the writing stored procedure + /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed + [] + _etag: string + + /// base 'i' value for the Events held herein + i: int64 + + /// Events + e: BatchEvent[] + + /// Projections + c: Projection[] } + /// arguably this should be a high nember to reflect fact it is the freshest ? + static member WellKnownDocumentId = "-1" +/// Projection based on the state at a given point in time `i` +and [] + Projection = + { /// Base: Max index rolled into this projection + i: int64 + + ///// Indicates whether this is actually an event being retained to support a lagging projection + //x: bool + + /// The Event Type of this compaction/snapshot, used to drive deserialization + t: string // required + + /// Event body - Json -> UTF-8 -> Deflate -> Base64 + [)>] + d: byte[] // required + + /// Optional metadata, same encoding as `d` (can be null; not written if missing) + [)>] + [] + m: byte[] } // optional + +type Enum() = + static member Events (b:WipBatch) = + b.e |> Seq.mapi (fun offset x -> + { new IOrderedEvent with + member __.Index = b.i + int64 offset + member __.IsProjection = false + member __.EventType = x.t + member __.Data = x.d + member __.Meta = x.m }) + static member Events (i: int64, e:BatchEvent[]) = + e |> Seq.mapi (fun offset x -> { new IOrderedEvent with - member __.Index = b.i + int64 offset + member __.Index = i + int64 offset member __.IsProjection = false member __.EventType = x.t member __.Data = x.d member __.Meta = x.m }) - static member Events (i: int64, e:BatchEvent[]) = - e |> Seq.mapi (fun offset x -> - { new IOrderedEvent with - member __.Index = i + int64 offset - member __.IsProjection = false - member __.EventType = x.t - member __.Data = x.d - member __.Meta = x.m }) - static member Events (b:Batch) = - Enum.Events (b.i, b.e) - static member Projections (xs: Projection[]) = seq { - for x in xs -> { new IOrderedEvent with - member __.Index = x.i - member __.IsProjection = true - member __.EventType = x.t - member __.Data = x.d - member __.Meta = x.m } } - static member EventsAndProjections (x:WipBatch): IOrderedEvent seq = - Enum.Events x - |> Seq.append (Enum.Projections x.c) - // where Index is equal, projections get delivered after the events so the fold semantics can be 'idempotent' - |> Seq.sortBy (fun x -> x.Index, x.IsProjection) + static member Events (b:Batch) = + Enum.Events (b.i, b.e) + static member Projections (xs: Projection[]) = seq { + for x in xs -> { new IOrderedEvent with + member __.Index = x.i + member __.IsProjection = true + member __.EventType = x.t + member __.Data = x.d + member __.Meta = x.m } } + static member EventsAndProjections (x:WipBatch): IOrderedEvent seq = + Enum.Events x + |> Seq.append (Enum.Projections x.c) + // where Index is equal, projections get delivered after the events so the fold semantics can be 'idempotent' + |> Seq.sortBy (fun x -> x.Index, x.IsProjection) + +namespace Equinox.Cosmos + +open Equinox +open Equinox.Cosmos.Events +open Equinox.Cosmos.Store +open Equinox.Store +open FSharp.Control +open Microsoft.Azure.Documents +open Serilog +open System + +[] +type Direction = Forward | Backward with + override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" -open Store +[] +type AccessStrategy<'event,'state> = + | EventsAreState + | //[] + RollingSnapshots of eventType: string * compact: ('state -> 'event) + | IndexedSearch of (string -> bool) * index: ('state -> 'event seq) module Log = [] @@ -275,7 +250,7 @@ module Log = | WriteResync of Measurement | WriteConflict of Measurement /// Individual read request in a Batch - | Slice of Store.Direction * Measurement + | Slice of Direction * Measurement /// Individual read request for the Index | Index of Measurement /// Individual read request for the Index, not found @@ -283,15 +258,15 @@ module Log = /// Index read with Single RU Request Charge due to correct use of etag in cache | IndexNotModified of Measurement /// Summarizes a set of Slices read together - | Batch of Store.Direction * slices: int * Measurement + | Batch of Direction * slices: int * Measurement let prop name value (log : ILogger) = log.ForContext(name, value) - let propData name (events: #Store.IEvent seq) (log : ILogger) = + let propData name (events: #IEvent seq) (log : ILogger) = let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (System.Text.Encoding.UTF8.GetString e.Data) } log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) let propEvents = propData "events" - let propEventsBatch (x : Store.Batch) log = log |> propData "events" (Store.Enum.Events x) - let propEventsWipBatch (x : Store.WipBatch) log = log |> propData "events" (Store.Enum.Events x) - let propDataProjections = Store.Enum.Projections >> propData "projections" + let propEventsBatch (x : Batch) log = log |> propData "events" (Enum.Events x) + let propEventsWipBatch (x : WipBatch) log = log |> propData "events" (Enum.Events x) + let propDataProjections = Enum.Projections >> propData "projections" let withLoggedRetries<'t> retryPolicy (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> = match retryPolicy with @@ -308,34 +283,62 @@ module Log = let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("cosmosEvt", ScalarValue(value))) log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt }) let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length - let (|EventLen|) (x: #Store.IEvent) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes+metaBytes + let (|EventLen|) (x: #IEvent) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes+metaBytes let (|BatchLen|) = Seq.sumBy (|EventLen|) -[] -type EqxSyncResult = - | Written of Store.Position - | Conflict of Store.Position * events: Store.IOrderedEvent[] - | ConflictUnknown of Store.Position +[] +module private DocDb = + /// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions + let (|AggregateException|) (exn : exn) = + let rec aux (e : exn) = + match e with + | :? AggregateException as agg when agg.InnerExceptions.Count = 1 -> + aux agg.InnerExceptions.[0] + | _ -> e -// NB don't nest in a private module, or serialization will fail miserably ;) -[] -type WriteResponse = { etag: string; nextI: int64; conflicts: Store.BatchEvent[] } + aux exn + /// DocumentDB Error HttpStatusCode extractor + let (|DocDbException|_|) (e : exn) = + match e with + | AggregateException (:? DocumentClientException as dce) -> Some dce + | _ -> None + /// Map Nullable to Option + let (|HasValue|Null|) (x:Nullable<_>) = + if x.HasValue then HasValue x.Value + else Null + /// DocumentDB Error HttpStatusCode extractor + let (|DocDbStatusCode|_|) (e : DocumentClientException) = + match e.StatusCode with + | HasValue x -> Some x + | Null -> None -module private Write = - let mkBatch (stream: Store.Stream) (events: Store.IEvent[]) projections : Store.WipBatch = - { p = stream.name; id = Store.WipBatch.WellKnownDocumentId; i = -1L(*Server-managed*); _etag = null - e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |] - c = Array.ofSeq projections } - let mkProjections baseIndex (projectionEvents: Store.IEvent seq) : Store.Projection seq = - projectionEvents |> Seq.mapi (fun i x -> { i = baseIndex + int64 i; t = x.EventType; d = x.Data; m = x.Meta } : Store.Projection) - let [] sprocName = "EquinoxPagedWrite47" // NB need to renumber for any breaking change + type ReadResult<'T> = Found of 'T | NotFound | NotModified + type DocDbCollection(client : IDocumentClient, collectionUri) = + member __.TryReadDocument(documentId : string, ?options : Client.RequestOptions): Async> = async { + let! ct = Async.CancellationToken + let options = defaultArg options null + let docLink = sprintf "%O/docs/%s" collectionUri documentId + try let! document = async { return! client.ReadDocumentAsync<'T>(docLink, options = options, cancellationToken = ct) |> Async.AwaitTaskCorrect } + if document.StatusCode = System.Net.HttpStatusCode.NotModified then return document.RequestCharge, NotModified + // NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result + else return document.RequestCharge, Found document.Document + with + | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.NotFound as e) -> return e.RequestCharge, NotFound + // NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens + | DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified } + +module Sync = + // NB don't nest in a private module, or serialization will fail miserably ;) + [] + type SyncResponse = { etag: string; nextI: int64; conflicts: BatchEvent[] } + let [] sprocName = "EquinoxSync001" // NB need to renumber for any breaking change let [] sprocBody = """ // Manages the merging of the supplied Request Batch, fulfilling one of the following end-states // 1 Verify no current WIP batch, the incoming `req` becomes the WIP batch (the caller is entrusted to provide a valid and complete set of inputs, or it's GIGO) // 2 Current WIP batch has space to accommodate the incoming projections (req.c) and events (req.e) - merge them in, replacing any superseded projections // 3. Current WIP batch would become too large - remove WIP state from active document by replacing the well known id with a correct one; proceed as per 1 -function pagedWrite(req, expectedVersion, maxEvents) { +function sync(req, expectedVersion, maxEvents) { if (!req) throw new Error("Missing req argument"); const collection = getContext().getCollection(); const collectionLink = collection.getSelfLink(); @@ -406,27 +409,33 @@ function pagedWrite(req, expectedVersion, maxEvents) { } }""" - let private run (client: IDocumentClient) (stream: Store.Stream) (expectedVersion: int64 option, req: WipBatch, maxEvents: int) - : Async = async { + [] + type Result = + | Written of Position + | Conflict of Position * events: IOrderedEvent[] + | ConflictUnknown of Position + + let private run (client: IDocumentClient) (stream: Stream) (expectedVersion: int64 option, req: WipBatch, maxEvents: int) + : Async = async { let sprocLink = sprintf "%O/sprocs/%s" stream.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(stream.name)) let! ct = Async.CancellationToken let ev = match expectedVersion with Some ev -> Position.FromIndexOnly ev | None -> Position.FromAppendAtEnd - let! (res : Client.StoredProcedureResponse) = + let! (res : Client.StoredProcedureResponse) = client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box req, box ev.index, box maxEvents) |> Async.AwaitTaskCorrect let newPos = { index = res.Response.nextI; etag = Option.ofObj res.Response.etag } return res.RequestCharge, res.Response.conflicts |> function - | null -> EqxSyncResult.Written newPos - | [||] when newPos.index = 0L -> EqxSyncResult.Conflict (newPos, Array.empty) - | [||] -> EqxSyncResult.ConflictUnknown newPos - | xs -> EqxSyncResult.Conflict (newPos, Store.Enum.Events (ev.index, xs) |> Array.ofSeq) } + | null -> Result.Written newPos + | [||] when newPos.index = 0L -> Result.Conflict (newPos, Array.empty) + | [||] -> Result.ConflictUnknown newPos + | xs -> Result.Conflict (newPos, Enum.Events (ev.index, xs) |> Array.ofSeq) } - let private logged client (stream: Store.Stream) (expectedVersion, req: WipBatch, maxEvents) (log : ILogger) - : Async = async { + let private logged client (stream: Stream) (expectedVersion, req: WipBatch, maxEvents) (log : ILogger) + : Async = async { let verbose = log.IsEnabled Events.LogEventLevel.Debug - let log = if verbose then log |> Log.propEvents (Store.Enum.Events req) |> Log.propDataProjections req.c else log - let (Log.BatchLen bytes), count = Store.Enum.Events req, req.e.Length + let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataProjections req.c else log + let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length let log = log |> Log.prop "bytes" bytes let writeLog = log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion @@ -436,33 +445,74 @@ function pagedWrite(req, expectedVersion, maxEvents) { let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } let logConflict () = writeLog.Information("Eqx TrySync Conflict writing {eventTypes}", [| for x in req.e -> x.t |]) match result with - | EqxSyncResult.Written pos -> + | Result.Written pos -> log |> Log.event (Log.WriteSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos - | EqxSyncResult.ConflictUnknown pos -> + | Result.ConflictUnknown pos -> logConflict () log |> Log.event (Log.WriteConflict (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos |> Log.prop "conflict" true - | EqxSyncResult.Conflict (pos, xs) -> + | Result.Conflict (pos, xs) -> logConflict () let log = if verbose then log |> Log.prop "nextExpectedVersion" pos |> Log.propData "conflicts" xs else log log |> Log.event (Log.WriteResync(mkMetric ru)) |> Log.prop "conflict" true resultLog.Information("Eqx {action:l} {count}+{pcount} {ms}ms rc={ru}", "Write", req.e.Length, req.c.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) return result } - let batch (log : ILogger) retryPolicy client pk batch: Async = + let batch (log : ILogger) retryPolicy client pk batch: Async = let call = logged client pk batch Log.withLoggedRetries retryPolicy "writeAttempt" call log + let mkBatch (stream: Events.Stream) (events: IEvent[]) projections : WipBatch = + { p = stream.name; id = Store.WipBatch.WellKnownDocumentId; i = -1L(*Server-managed*); _etag = null + e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |] + c = Array.ofSeq projections } + let mkProjections baseIndex (projectionEvents: IEvent seq) : Store.Projection seq = + projectionEvents |> Seq.mapi (fun i x -> { i = baseIndex + int64 i; t = x.EventType; d = x.Data; m = x.Meta } : Store.Projection) -open Store + module Initialization = + open System.Collections.ObjectModel + let createDatabase (client:IDocumentClient) dbName = async { + let opts = Client.RequestOptions(ConsistencyLevel = Nullable ConsistencyLevel.Session) + let! db = client.CreateDatabaseIfNotExistsAsync(Database(Id=dbName), options = opts) |> Async.AwaitTaskCorrect + return db.Resource.Id } + + let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async { + let pkd = PartitionKeyDefinition() + pkd.Paths.Add(sprintf "/%s" Store.Batch.PartitionKeyField) + let colld = DocumentCollection(Id = collName, PartitionKey = pkd) + + colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent + colld.IndexingPolicy.Automatic <- true + // Can either do a blacklist or a whitelist + // Given how long and variable the blacklist would be, we whitelist instead + colld.IndexingPolicy.ExcludedPaths <- Collection [|ExcludedPath(Path="/*")|] + // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors + colld.IndexingPolicy.IncludedPaths <- Collection [| for k in Store.Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |] + let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect + return coll.Resource.Id } + + let createProc (log: ILogger) (client: IDocumentClient) (collectionUri: Uri) = async { + let def = new StoredProcedure(Id = sprocName, Body = sprocBody) + log.Information("Creating stored procedure {sprocId}", def.Id) + // TODO ifnotexist semantics + return! client.CreateStoredProcedureAsync(collectionUri, def) |> Async.AwaitTaskCorrect |> Async.Ignore } + + let initialize log (client : IDocumentClient) dbName collName ru = async { + let! dbId = createDatabase client dbName + let dbUri = Client.UriFactory.CreateDatabaseUri dbId + let! collId = createCollection client dbUri collName ru + let collUri = Client.UriFactory.CreateDocumentCollectionUri (dbName, collId) + //let! _aux = createAux client dbUri collName auxRu + return! createProc log client collUri + } module private Read = - let private getIndex (client: IDocumentClient) (stream: Store.Stream, maybePos: Store.Position option) = + let private getIndex (client: IDocumentClient) (stream: Stream, maybePos: Position option) = let coll = DocDbCollection(client, stream.collectionUri) let ac = match maybePos with Some { etag=Some etag } -> Client.AccessCondition(Type=Client.AccessConditionType.IfNoneMatch, Condition=etag) | _ -> null let ro = Client.RequestOptions(PartitionKey=PartitionKey(stream.name), AccessCondition = ac) - coll.TryReadDocument(Store.WipBatch.WellKnownDocumentId, ro) - let private loggedGetIndex (getIndex : Store.Stream * Store.Position option -> Async<_>) (stream: Store.Stream, maybePos: Store.Position option) (log: ILogger) = async { + coll.TryReadDocument(WipBatch.WellKnownDocumentId, ro) + let private loggedGetIndex (getIndex : Stream * Position option -> Async<_>) (stream: Stream, maybePos: Position option) (log: ILogger) = async { let log = log |> Log.prop "stream" stream.name - let! t, (ru, res : ReadResult) = getIndex (stream,maybePos) |> Stopwatch.Time + let! t, (ru, res : ReadResult) = getIndex (stream,maybePos) |> Stopwatch.Time let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }) match res with | ReadResult.NotModified -> @@ -471,14 +521,14 @@ module private Read = (log 0 0 Log.IndexNotFound).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found doc -> let log = - let (Log.BatchLen bytes), count = Store.Enum.Projections doc.c, doc.c.Length + let (Log.BatchLen bytes), count = Enum.Projections doc.c, doc.c.Length log bytes count Log.Index let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataProjections doc.c |> Log.prop "etag" doc._etag log.Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } - type [] IndexResult = NotModified | NotFound | Found of Store.Position * Store.IOrderedEvent[] + type [] IndexResult = NotModified | NotFound | Found of Position * IOrderedEvent[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.UnChanged - let tryLoadIndex (log : ILogger) retryPolicy client (stream: Store.Stream, maybePos: Store.Position option): Async = async { + let tryLoadIndex (log : ILogger) retryPolicy client (stream: Stream, maybePos: Position option): Async = async { let getIndex = getIndex client let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGetIndex getIndex (stream,maybePos)) log match res with @@ -486,10 +536,10 @@ module private Read = | ReadResult.NotFound -> return IndexResult.NotFound | ReadResult.Found doc -> let pos' = Position.FromDocument(doc.i + int64 doc.e.Length, doc._etag) - return IndexResult.Found (pos', Store.Enum.EventsAndProjections doc |> Array.ofSeq) } + return IndexResult.Found (pos', Enum.EventsAndProjections doc |> Array.ofSeq) } open Microsoft.Azure.Documents.Linq - let private genBatchesQuery (client : IDocumentClient) (stream: Store.Stream, pos: Store.Position option) (direction: Direction) batchSize = + let private genBatchesQuery (client : IDocumentClient) (stream: Stream, pos: Position option) (direction: Direction) batchSize = let querySpec = match pos with | None -> SqlQuerySpec("SELECT * FROM c ORDER BY c.i " + if direction = Direction.Forward then "ASC" else "DESC") @@ -497,15 +547,15 @@ module private Read = let f = if direction = Direction.Forward then "c.i >= @id ORDER BY c.i ASC" else "c.i < @id ORDER BY c.i DESC" SqlQuerySpec( "SELECT * FROM c WHERE " + f, SqlParameterCollection [SqlParameter("@id", p.index)]) let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable batchSize) - client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() + client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() // Unrolls the Batches in a response - note when reading backawards, the events are emitted in reverse order of index - let private handleSlice (stream: Store.Stream, startPos: Store.Position option) direction (query: IDocumentQuery) (log: ILogger) - : Async = async { + let private handleSlice (stream: Stream, startPos: Position option) direction (query: IDocumentQuery) (log: ILogger) + : Async = async { let! ct = Async.CancellationToken - let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time + let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, ru = Array.ofSeq res, res.RequestCharge - let events = batches |> Seq.collect Store.Enum.Events |> Array.ofSeq + let events = batches |> Seq.collect Enum.Events |> Array.ofSeq if direction = Direction.Backward then Array.Reverse events // NB no Seq.rev in old FSharp.Core let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } @@ -517,14 +567,14 @@ module private Read = "Query", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) // TODO we should be able to trap the etag from the -1 document with some tweaking let todoEtag = null - let pos = match index with HasValue i -> Store.Position.FromDocument(i,todoEtag) |> Some | Null -> None + let pos = match index with HasValue i -> Position.FromDocument(i,todoEtag) |> Some | Null -> None return events, pos, ru } - let private runBatchesQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) + let private runBatchesQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) (maxPermittedBatchReads: int option) - (query: IDocumentQuery) - : AsyncSeq = - let rec loop batchCount : AsyncSeq = asyncSeq { + (query: IDocumentQuery) + : AsyncSeq = + let rec loop batchCount : AsyncSeq = asyncSeq { match maxPermittedBatchReads with | Some mpbr when batchCount >= mpbr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" | _ -> () @@ -536,7 +586,7 @@ module private Read = yield! loop (batchCount + 1) } loop 0 - let logBatchRead direction batchSize streamName interval (responsesCount, events : Store.IOrderedEvent []) nextI (ru: float) (log : ILogger) = + let logBatchRead direction batchSize streamName interval (responsesCount, events : IOrderedEvent []) nextI (ru: float) (log : ILogger) = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let action = match direction with Direction.Forward -> "LoadF" | Direction.Backward -> "LoadB" @@ -545,12 +595,12 @@ module private Read = "Eqx {action:l} {stream} v{nextI} {count}/{responses} {ms}ms rc={ru}", action, streamName, Option.toNullable nextI, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) - let loadFrom (log : ILogger) retryPolicy client direction batchSize maxPermittedBatchReads (stream: Store.Stream, pos: Store.Position) - : Async = async { + let loadFrom (log : ILogger) retryPolicy client direction batchSize maxPermittedBatchReads (stream: Stream, pos: Position) + : Async = async { let mutable ru = 0.0 let mutable responses = 0 - let mergeBatches (batches: AsyncSeq) = async { - let! (events : Store.IOrderedEvent[]) = + let mergeBatches (batches: AsyncSeq) = async { + let! (events : IOrderedEvent[]) = batches |> AsyncSeq.map (fun (events, _maybePos, r) -> ru <- ru + r; responses <- responses + 1; events) |> AsyncSeq.concatSeq @@ -560,38 +610,38 @@ module private Read = let pullSlice = handleSlice (stream,Some pos) direction let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) let log = log |> Log.prop "batchSize" batchSize |> Log.prop "direction" direction |> Log.prop "stream" stream.name - let slices : AsyncSeq = runBatchesQuery log retryingLoggingReadSlice maxPermittedBatchReads query + let slices : AsyncSeq = runBatchesQuery log retryingLoggingReadSlice maxPermittedBatchReads query let! t, (events, ru) = mergeBatches slices |> Stopwatch.Time query.Dispose() return t, responses, events, ru } - let loadForwardsFrom (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads (stream: Store.Stream, pos: Store.Position) - : Async = async { + let loadForwardsFrom (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads (stream: Stream, pos: Position) + : Async = async { let direction = Direction.Forward let! t, responses, events, ru = loadFrom log retryPolicy client direction batchSize maxPermittedBatchReads (stream,pos) let nextI = if events.Length = 0 then 0L else events.[events.Length-1].Index+1L // No Array.tryLast in older FSharp.Core log |> logBatchRead direction batchSize stream.name t (responses,events) (Some nextI) ru return { pos with index = nextI }, events } - let loadBackwardsFrom (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads (stream: Store.Stream, pos: Store.Position) - : Async = async { + let loadBackwardsFrom (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads (stream: Stream, pos: Position) + : Async = async { let direction = Direction.Backward let! t, responses, events, ru = loadFrom log retryPolicy client direction batchSize maxPermittedBatchReads (stream,pos) log |> logBatchRead direction batchSize stream.name t (responses,events) None ru return events } - let calculateUsedVersusDroppedPayload firstUsedEventIndex (xs: Store.IOrderedEvent[]) : int * int = + let calculateUsedVersusDroppedPayload firstUsedEventIndex (xs: IOrderedEvent[]) : int * int = let mutable used, dropped = 0, 0 for x in xs do let (Log.EventLen bytes) = x if x.Index >= firstUsedEventIndex then used <- used + bytes else dropped <- dropped + bytes used, dropped - let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads isCompactionEvent (stream: Store.Stream) - : Async = async { + let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy client batchSize maxPermittedBatchReads isCompactionEvent (stream: Stream) + : Async = async { let mutable responseCount = 0 - let mergeFromCompactionPointOrStartFromBackwardsStream (log : ILogger) (batchesBackward : AsyncSeq) - : Async = async { + let mergeFromCompactionPointOrStartFromBackwardsStream (log : ILogger) (batchesBackward : AsyncSeq) + : Async = async { let mutable lastSlice = None let mutable maybeFirstPos = None let mutable ru = 0.0 @@ -614,14 +664,14 @@ module private Read = false) |> AsyncSeq.toArrayAsync let eventsForward = Array.Reverse(tempBackward); tempBackward // sic - relatively cheap, in-place reverse of something we own - return eventsForward, (match maybeFirstPos with Some pos -> pos | None -> Store.Position.FromEmptyStream()), ru } + return eventsForward, (match maybeFirstPos with Some pos -> pos | None -> Position.FromEmptyStream), ru } let direction = Direction.Backward use query = genBatchesQuery client (stream,None) direction batchSize let pullSlice = handleSlice (stream,None) direction let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" stream.name let readlog = log |> Log.prop "direction" direction - let batchesBackward : AsyncSeq = runBatchesQuery readlog retryingLoggingReadSlice maxPermittedBatchReads query + let batchesBackward : AsyncSeq = runBatchesQuery readlog retryingLoggingReadSlice maxPermittedBatchReads query let! t, (events, pos, ru) = mergeFromCompactionPointOrStartFromBackwardsStream log batchesBackward |> Stopwatch.Time query.Dispose() let version = if events.Length = 0 then 0L else events.[events.Length-1].Index+1L // No Array.tryLast in older FSharp.Core // the merge put them in order so this is correct @@ -629,17 +679,17 @@ module private Read = return pos, events } module UnionEncoderAdapters = - let private mkEvent (x : UnionCodec.EncodedUnion) : Store.IEvent = - { new Store.IEvent with + let private mkEvent (x : UnionCodec.EncodedUnion) : IEvent = + { new IEvent with member __.EventType = x.caseName member __.Data = x.payload member __.Meta = null } - let encodeEvent (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (x : 'event) : Store.IEvent = + let encodeEvent (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (x : 'event) : IEvent = codec.Encode x |> mkEvent - let decodeKnownEvents (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (xs : Store.IOrderedEvent seq) : 'event seq = + let decodeKnownEvents (codec : UnionCodec.IUnionEncoder<'event, byte[]>) (xs : IOrderedEvent seq) : 'event seq = xs |> Seq.choose (fun x -> codec.TryDecode { caseName = x.EventType; payload = x.Data }) -type []Token = { stream: Store.Stream; pos: Store.Position; rollingSnapshotEventIndex: int64 option; batchCapacityLimit: int option } +type []Token = { stream: Stream; pos: Position; rollingSnapshotEventIndex: int64 option; batchCapacityLimit: int option } module Token = let private create rollingSnapshotEventIndex batchCapacityLimit stream pos : Storage.StreamToken = @@ -648,7 +698,7 @@ module Token = let ofNonCompacting (stream,pos) : Storage.StreamToken = create None None stream pos // headroom before compaction is necessary given the stated knowledge of the last (if known) `rollingSnapshotEventIndexption` - let private batchCapacityLimit maybeSnapshotEventIndex unstoredEventsPending (windowSize : int) (pos : Store.Position) : int = + let private batchCapacityLimit maybeSnapshotEventIndex unstoredEventsPending (windowSize: int) (pos: Position) : int = match maybeSnapshotEventIndex with | Some (rollingSnapshotEventIndex : int64) -> (windowSize - unstoredEventsPending) - int (pos.index - rollingSnapshotEventIndex + 1L) |> max 0 | None -> (windowSize - unstoredEventsPending) - int pos.index |> max 0 @@ -667,7 +717,7 @@ module Token = let rollingSnapshotEventIndexOption = previousToken.rollingSnapshotEventIndex ofRollingSnapshotEventIndex rollingSnapshotEventIndexOption 0 batchSize pos /// Use an event just read from the stream to infer headroom - let ofCompactionResolvedEventAndVersion (compactionEvent: Store.IOrderedEvent) batchSize pos : Storage.StreamToken = + let ofCompactionResolvedEventAndVersion (compactionEvent: IOrderedEvent) batchSize pos : Storage.StreamToken = ofRollingSnapshotEventIndex (Some compactionEvent.Index) 0 batchSize pos /// Use an event we are about to write to the stream to infer headroom let ofPreviousStreamVersionAndCompactionEventDataIndex prevPos compactionEventDataIndex eventsLength batchSize streamVersion' : Storage.StreamToken = @@ -677,70 +727,109 @@ module Token = let currentETag, newETag = current.pos.etag, x.pos.etag newVersion > currentVersion || currentETag <> newETag +namespace Equinox.Cosmos.Builder + +open Equinox +open Equinox.Cosmos.Events // NB needs to be shadow by Equinox.Cosmos +open Equinox.Cosmos +open Equinox.Store.Infrastructure +open FSharp.Control +open Microsoft.Azure.Documents +open Serilog +open System + +[] +module Internal = + [] + type InternalSyncResult = Written of Storage.StreamToken | ConflictUnknown of Storage.StreamToken | Conflict of Storage.StreamToken * IOrderedEvent[] + + [] + type LoadFromTokenResult = Unchanged | Found of Storage.StreamToken * IOrderedEvent[] + + [] + type SearchStrategy<'event> = + | EventType of string + | Predicate of ('event -> bool) + +/// Defines the policies in force for retrying with regard to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) type EqxConnection(client: IDocumentClient, ?readRetryPolicy (*: (int -> Async<'T>) -> Async<'T>*), ?writeRetryPolicy) = member __.Client = client member __.ReadRetryPolicy = readRetryPolicy member __.WriteRetryPolicy = writeRetryPolicy member __.Close = (client :?> Client.DocumentClient).Dispose() -type EqxBatchingPolicy(getMaxBatchSize : unit -> int, ?batchCountLimit, ?maxEventsPerSlice) = - new (maxBatchSize) = EqxBatchingPolicy(fun () -> maxBatchSize) - member __.BatchSize = getMaxBatchSize() - member __.MaxBatches = batchCountLimit +/// Defines the policies in force regarding how to a) split up calls b) limit the number of events per slice +type EqxBatchingPolicy + ( // Max items to request in query response. Defaults to 10. + ?defaultMaxSlices : int, + // Dynamic version of `defaultMaxSlices`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxSlices` + ?getDefaultMaxSlices : unit -> int, + /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited. + ?maxCalls, + /// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events. Defaults to 10. + ?maxEventsPerSlice) = + let getDefaultMaxSlices = defaultArg getDefaultMaxSlices (fun () -> defaultArg defaultMaxSlices 10) + /// Limit for Maximum number of `Batch` records in a single query batch response + member __.MaxSlices = getDefaultMaxSlices () + /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices` + member __.MaxCalls = maxCalls + /// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events member __.MaxEventsPerSlice = defaultArg maxEventsPerSlice 10 -[] -type GatewaySyncResult = Written of Storage.StreamToken | ConflictUnknown of Storage.StreamToken | Conflict of Storage.StreamToken * Store.IOrderedEvent[] - -[] -type LoadFromTokenResult = Unchanged | Found of Storage.StreamToken * Store.IOrderedEvent[] - type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = - let (|EventTypePredicate|) predicate (x:Store.IOrderedEvent) = predicate x.EventType - let (|IEventDataArray|) events = [| for e in events -> e :> Store.IOrderedEvent |] - member __.LoadForward log batchingOverride maybeRollingSnapshotPredicate (stream: Store.Stream, pos: Store.Position) - : Async = async { + let (|EventTypePredicate|) predicate (x:IOrderedEvent) = predicate x.EventType + let (|IEventDataArray|) events = [| for e in events -> e :> IOrderedEvent |] + member __.LoadForward log batchingOverride maybeRollingSnapshotPredicate (stream: Stream, pos: Position) + : Async = async { let batching = defaultArg batchingOverride batching - let! pos, events = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.Client batching.BatchSize batching.MaxBatches (stream,pos) + let! pos, events = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.Client batching.MaxSlices batching.MaxCalls (stream,pos) match maybeRollingSnapshotPredicate with | None -> return Token.ofNonCompacting (stream,pos), events | Some (EventTypePredicate isCompactionEvent) -> match events |> Array.tryFindBack isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.BatchSize (stream,pos), events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize (stream,pos), events } - member __.LoadBackward log batchingOverride (stream: Store.Stream, pos: Store.Position) - : Async = async { + | None -> return Token.ofUncompactedVersion batching.MaxSlices (stream,pos), events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.MaxSlices (stream,pos), events } + member __.LoadBackward log batchingOverride (stream: Stream, pos: Position) + : Async = async { let batching = defaultArg batchingOverride batching - return! Read.loadBackwardsFrom log conn.ReadRetryPolicy conn.Client batching.BatchSize batching.MaxBatches (stream,pos) } + return! Read.loadBackwardsFrom log conn.ReadRetryPolicy conn.Client batching.MaxSlices batching.MaxCalls (stream,pos) } member __.LoadBackwardsStoppingAtCompactionEvent log (EventTypePredicate isCompactionEvent) stream - : Async = async { - let! pos, events = Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.Client batching.BatchSize batching.MaxBatches isCompactionEvent stream + : Async = async { + let! pos, events = Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.Client batching.MaxSlices batching.MaxCalls isCompactionEvent stream match Array.tryHead events |> Option.filter isCompactionEvent with - | None -> return Token.ofUncompactedVersion batching.BatchSize (stream,pos), events - | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize (stream,pos), events } + | None -> return Token.ofUncompactedVersion batching.MaxSlices (stream,pos), events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.MaxSlices (stream,pos), events } member private __.InterpretIndexOrFallback log (EventTypePredicate isRelevantProjectionOrRollingSnapshot as etp) stream res - : Async = async { + : Async = async { match res with | Read.IndexResult.NotModified -> return invalidOp "Not handled" | Read.IndexResult.Found (pos, projectionsAndEvents) when projectionsAndEvents |> Array.exists isRelevantProjectionOrRollingSnapshot -> return Token.ofNonCompacting (stream,pos), projectionsAndEvents | _ -> return! __.LoadBackwardsStoppingAtCompactionEvent log etp stream } member __.IndexedOrBatched log isCompactionEventType (stream,maybePos) - : Async = async { + : Async = async { let! res = Read.tryLoadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client (stream,maybePos) return! __.InterpretIndexOrFallback log isCompactionEventType stream res } + member __.GetPosition(log, stream, ?pos) + : Async = async { + let! res = Read.tryLoadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client (stream,pos) + match res with + | Read.IndexResult.NotFound -> return Token.ofNonCompacting(stream,Position.FromEmptyStream) + | Read.IndexResult.NotModified -> return Token.ofNonCompacting (stream, pos.Value) + | Read.IndexResult.Found (pos, _projectionsAndEvents) -> return Token.ofNonCompacting (stream,pos) } member __.LoadFromToken log (Token.Unpack token as streamToken) maybeRollingSnapshotOrProjectionPredicate tryIndex : Async = async { let ok r = LoadFromTokenResult.Found r if not tryIndex then - let! pos, ((IEventDataArray xs) as events) = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.Client batching.BatchSize batching.MaxBatches (token.stream,token.pos) + let! pos, ((IEventDataArray xs) as events) = + Read.loadForwardsFrom log conn.ReadRetryPolicy conn.Client batching.MaxSlices batching.MaxCalls (token.stream,token.pos) let ok t = ok (t,xs) match maybeRollingSnapshotOrProjectionPredicate with | None -> return ok (Token.ofNonCompacting (token.stream,token.pos)) | Some (EventTypePredicate isCompactionEvent) -> match events |> Array.tryFindBack isCompactionEvent with - | None -> return ok (Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize (token.stream,token.pos)) - | Some resolvedEvent -> return ok (Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize (token.stream,pos)) + | None -> return ok (Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.MaxSlices (token.stream,token.pos)) + | Some resolvedEvent -> return ok (Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.MaxSlices (token.stream,pos)) else let! res = Read.tryLoadIndex log None(* TODO conn.ReadRetryPolicy*) conn.Client (token.stream,Some token.pos) match res with @@ -749,46 +838,38 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = | _ -> let! loaded = __.InterpretIndexOrFallback log maybeRollingSnapshotOrProjectionPredicate.Value token.stream res return ok loaded } - member __.TrySync log (Token.Unpack token as streamToken) maybeRollingSnapshotPredicate (expectedVersion,batch: WipBatch): Async = async { - let! wr = Write.batch log conn.WriteRetryPolicy conn.Client token.stream (expectedVersion,batch,batching.MaxEventsPerSlice) + member __.TrySync log (Token.Unpack token as streamToken) maybeRollingSnapshotPredicate (expectedVersion, batch: Store.WipBatch) + : Async = async { + let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client token.stream (expectedVersion,batch,batching.MaxEventsPerSlice) match wr with - | EqxSyncResult.Conflict (pos',events) -> - return GatewaySyncResult.Conflict (Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize (token.stream,pos'),events) - | EqxSyncResult.ConflictUnknown pos' -> - return GatewaySyncResult.ConflictUnknown (Token.ofPreviousTokenWithUpdatedPosition streamToken batching.BatchSize (token.stream,pos')) - | EqxSyncResult.Written pos' -> + | Sync.Result.Conflict (pos',events) -> + return InternalSyncResult.Conflict (Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.MaxSlices (token.stream,pos'),events) + | Sync.Result.ConflictUnknown pos' -> + return InternalSyncResult.ConflictUnknown (Token.ofPreviousTokenWithUpdatedPosition streamToken batching.MaxSlices (token.stream,pos')) + | Sync.Result.Written pos' -> let token = match maybeRollingSnapshotPredicate with | None -> Token.ofNonCompacting (token.stream,pos') | Some isCompactionEvent -> match batch.e |> Array.tryFindIndexBack (fun x -> isCompactionEvent x.t) with - | None -> Token.ofPreviousTokenAndEventsLength streamToken batch.e.Length batching.BatchSize (token.stream,pos') + | None -> Token.ofPreviousTokenAndEventsLength streamToken batch.e.Length batching.MaxSlices (token.stream,pos') | Some compactionEventIndex -> - Token.ofPreviousStreamVersionAndCompactionEventDataIndex token.pos compactionEventIndex batch.e.Length batching.BatchSize (token.stream,pos') - return GatewaySyncResult.Written token } + Token.ofPreviousStreamVersionAndCompactionEventDataIndex token.pos compactionEventIndex batch.e.Length batching.MaxSlices (token.stream,pos') + return InternalSyncResult.Written token } -type private Collection(gateway : EqxGateway, databaseId, collectionId) = +type EqxCollection(gateway : EqxGateway, databaseId, collectionId) = member __.Gateway = gateway member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId) -[] -type SearchStrategy<'event> = - | EventType of string - | Predicate of ('event -> bool) - -[] -type AccessStrategy<'event,'state> = - | EventsAreState - | //[] - RollingSnapshots of eventType: string * compact: ('state -> 'event) - | IndexedSearch of (string -> bool) * index: ('state -> 'event seq) - type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = /// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes) member __.IsCompactionDue = eventsLen > capacityBeforeCompaction -type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?access : AccessStrategy<'event,'state>) = +type private Category<'event, 'state> + ( coll : EqxCollection, + codec : UnionCodec.IUnionEncoder<'event, byte[]>, + ?access : AccessStrategy<'event,'state>) = let compactionPredicate = match access with | None -> None @@ -800,8 +881,9 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni let load (fold: 'state -> 'event seq -> 'state) initial loadF = async { let! token, events = loadF return response fold initial token events } - member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) : Async = - let stream = Store.Stream.Create(coll.CollectionUri, streamName) + member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) + : Async = + let stream = Stream.Create(coll.CollectionUri, streamName) let forward = load fold initial (coll.Gateway.LoadForward log None None (stream,Position.FromIndexOnly 0L)) let compacted predicate = load fold initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate stream) let indexed predicate = load fold initial (coll.Gateway.IndexedOrBatched log predicate (stream,None)) @@ -819,7 +901,8 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni | LoadFromTokenResult.Found (token,events ) -> return response fold initial token events } member __.TrySync (fold: 'state -> 'event seq -> 'state) initial (log : ILogger) (Token.Unpack token as streamToken, expectedVersion : int64 option, state : 'state) - (events : 'event list, state' : 'state) : Async> = async { + (events : 'event list, state' : 'state) + : Async> = async { let eventsIncludingSnapshots, projections = match access with | None | Some AccessStrategy.EventsAreState -> @@ -832,13 +915,13 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni let encode = UnionEncoderAdapters.encodeEvent codec let encodedEvents, projections = Seq.map encode events |> Array.ofSeq, Seq.map encode projections let baseIndex = token.pos.index + int64 events.Length - let projections = Write.mkProjections baseIndex projections - let batch = Write.mkBatch token.stream encodedEvents projections + let projections = Sync.mkProjections baseIndex projections + let batch = Sync.mkBatch token.stream encodedEvents projections let! syncRes = coll.Gateway.TrySync log streamToken compactionPredicate (expectedVersion,batch) match syncRes with - | GatewaySyncResult.Conflict (token',events) -> return Storage.SyncResult.Conflict (async { return response fold initial token' events }) - | GatewaySyncResult.ConflictUnknown token' -> return Storage.SyncResult.Conflict (__.LoadFromToken fold initial state token' log) - | GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state eventsIncludingSnapshots) } + | InternalSyncResult.Conflict (token',events) -> return Storage.SyncResult.Conflict (async { return response fold initial token' events }) + | InternalSyncResult.ConflictUnknown token' -> return Storage.SyncResult.Conflict (__.LoadFromToken fold initial state token' log) + | InternalSyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state eventsIncludingSnapshots) } module Caching = open System.Runtime.Caching @@ -925,7 +1008,7 @@ type CachingStrategy = type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial, ?access, ?caching) = member __.Create (databaseId, collectionId, streamName) : Equinox.IStream<'event, 'state> = - let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?access = access) + let category = Category<'event, 'state>(EqxCollection(gateway, databaseId, collectionId), codec, ?access = access) let readCacheOption = match caching with @@ -944,42 +1027,6 @@ type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial Equinox.Stream.create category streamName -module Initialization = - let createDatabase (client:IDocumentClient) dbName = async { - let opts = Client.RequestOptions(ConsistencyLevel = Nullable ConsistencyLevel.Session) - let! db = client.CreateDatabaseIfNotExistsAsync(Database(Id=dbName), options = opts) |> Async.AwaitTaskCorrect - return db.Resource.Id } - - let createCollection (client: IDocumentClient) (dbUri: Uri) collName ru = async { - let pkd = PartitionKeyDefinition() - pkd.Paths.Add(sprintf "/%s" Store.Batch.PartitionKeyField) - let colld = DocumentCollection(Id = collName, PartitionKey = pkd) - - colld.IndexingPolicy.IndexingMode <- IndexingMode.Consistent - colld.IndexingPolicy.Automatic <- true - // Can either do a blacklist or a whitelist - // Given how long and variable the blacklist would be, we whitelist instead - colld.IndexingPolicy.ExcludedPaths <- System.Collections.ObjectModel.Collection [|ExcludedPath(Path="/*")|] - // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors - colld.IndexingPolicy.IncludedPaths <- System.Collections.ObjectModel.Collection [| for k in Store.Batch.IndexedFields -> IncludedPath(Path=sprintf "/%s/?" k) |] - let! coll = client.CreateDocumentCollectionIfNotExistsAsync(dbUri, colld, Client.RequestOptions(OfferThroughput=Nullable ru)) |> Async.AwaitTaskCorrect - return coll.Resource.Id } - - let createProc (log: ILogger) (client: IDocumentClient) (collectionUri: Uri) = async { - let def = new StoredProcedure(Id = Write.sprocName, Body = Write.sprocBody) - log.Information("Creating stored procedure {sprocId}", def.Id) - // TODO ifnotexist semantics - return! client.CreateStoredProcedureAsync(collectionUri, def) |> Async.AwaitTaskCorrect |> Async.Ignore } - - let initialize log (client : IDocumentClient) dbName collName ru = async { - let! dbId = createDatabase client dbName - let dbUri = Client.UriFactory.CreateDatabaseUri dbId - let! collId = createCollection client dbUri collName ru - let collUri = Client.UriFactory.CreateDocumentCollectionUri (dbName, collId) - //let! _aux = createAux client dbUri collName auxRu - return! createProc log client collUri - } - [] type Discovery = | UriAndKey of databaseUri:Uri * key:string @@ -1055,90 +1102,155 @@ type EqxConnector let! conn = connect(name, discovery) return EqxConnection(conn, ?readRetryPolicy=readRetryPolicy, ?writeRetryPolicy=writeRetryPolicy) } -module Events = - /// Outcome of appending events, specifying the new and/or conflicting events, together with the updated Target write position - [] - type AppendResult = - | Ok of index: int64 - | Conflict of index: int64 * conflictingEvents: IOrderedEvent[] - | ConflictUnknown of index: int64 - - type Context(conn, databaseId, collectionId, batchSize, ?log, ?maxEventsPerSlice) = - let getDefaultMaxBatchSize () = batchSize - let batching = new EqxBatchingPolicy(getDefaultMaxBatchSize, ?maxEventsPerSlice=maxEventsPerSlice) - let gateway = EqxGateway(conn, batching) - let collection = Collection(gateway, databaseId, collectionId) - let logger : ILogger = defaultArg log (Log.ForContext(Serilog.Core.Constants.SourceContextPropertyName,collectionId)) - let (|Stream|) streamName = Store.Stream.Create(collection.CollectionUri, streamName) - let (|Position|) = Position.FromIndexOnly - let (|ExpectedPosition|) = function Some ei -> Position.FromIndexOnly ei | None -> Position.FromAppendAtEnd - let getInternal (Stream stream) (Position pos) batchSize direction = async { - let batching = new EqxBatchingPolicy(maxBatchSize=batchSize) - let! data = - match direction with - | Direction.Backward -> gateway.LoadBackward logger (Some batching) (stream,pos) - | Direction.Forward -> async { - let pos = Position.FromIndexOnly 0L - // TOCONSIDER provide a way to send out the token - let! (_token, data: Store.IOrderedEvent[]) = gateway.LoadForward logger (Some batching) None (stream,pos) - return data } - // TODO fix algorithm so we can pass a skipcount +namespace Equinox.Cosmos.Core + +open Equinox.Cosmos +open Equinox.Cosmos.Builder +open Equinox.Cosmos.Events +open FSharp.Control + +/// Outcome of appending events, specifying the new and/or conflicting events, together with the updated Target write position +[] +type AppendResult<'t> = + | Ok of pos: 't + | Conflict of index: 't * conflictingEvents: IOrderedEvent[] + | ConflictUnknown of index: 't + +/// Encapsulates the core facilites Equinox.Cosmos offers for operating directly on Events in Streams. +type EqxContext + ( /// Connection to CosmosDb with DocumentDb Transient Read and Write Retry policies + conn : EqxConnection, + /// Default Cosmos Database name for this context + databaseId, + /// Default Cosmos Collection name for this context + collectionId, + /// Logger to write to - see https://github.com/serilog/serilog/wiki/Provided-Sinks for how to wire to your logger + logger : Serilog.ILogger, + /// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice). + /// Defaults to 10 + ?defaultMaxSlices, + /// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered. + /// Defaults to 100 + ?maxEventsPerSlice, + /// Alternate way of specifying defaultMaxSlices which facilitates reading it from a cached dynamic configuration + ?getDefaultMaxSlices) = + let getDefaultMaxSlices = match getDefaultMaxSlices with Some f -> f | None -> fun () -> defaultArg defaultMaxSlices 10 + let batching = EqxBatchingPolicy(getDefaultMaxSlices=getDefaultMaxSlices, maxEventsPerSlice=defaultArg maxEventsPerSlice 100) + let gateway = EqxGateway(conn, batching) + + member __.CreateStream(streamName,?dbId,?collId) = + let collection = EqxCollection(gateway, defaultArg dbId databaseId, defaultArg collId collectionId) + Stream.Create(collection.CollectionUri, streamName) + + member internal __.GetInternal((stream,pos) as streamPos, ?batchSize, ?direction) = async { + let direction = defaultArg direction Direction.Forward + let batching = batchSize |> Option.map (fun max -> EqxBatchingPolicy(defaultMaxSlices=max)) + let! data = match direction with - | Direction.Backward -> return data |> Seq.skipWhile (fun e -> e.Index > pos.index) |> Array.ofSeq - | Direction.Forward -> return data |> Seq.skipWhile (fun e -> e.Index < pos.index) |> Array.ofSeq } - member __.GetAll(streamName, index, batchSize) : AsyncSeq = asyncSeq { - let! res = getInternal streamName index batchSize Direction.Forward - // TODO add laziness - return AsyncSeq.ofSeq res } - member __.Get(streamName, index, batchSize, ?direction) : Async = - getInternal streamName index batchSize (defaultArg direction Direction.Forward) - member __.Append(Stream stream, events:Store.IEvent[], (ExpectedPosition pos as maybeEv)) = async { - let token = Token.ofNonCompacting (stream,pos) - let batch = Write.mkBatch stream events Seq.empty - let! res = gateway.TrySync logger token None (maybeEv,batch) - match res with - // TOCONSIDER we should not be unpacking the token so callers can gain perf and stop assuming/worrying about sequences - | GatewaySyncResult.Written (Token.Unpack token) -> return AppendResult.Ok token.pos.index - | GatewaySyncResult.Conflict (Token.Unpack token,events) -> return AppendResult.Conflict (token.pos.index, events) - | GatewaySyncResult.ConflictUnknown (Token.Unpack token) -> return AppendResult.ConflictUnknown token.pos.index } + | Direction.Backward -> gateway.LoadBackward logger batching streamPos + | Direction.Forward -> async { + // TODO fix query so we can honor start position + let pos = Position.FromIndexOnly 0L + // TOCONSIDER provide a way to send out the token + let! (_token, data: IOrderedEvent[]) = gateway.LoadForward logger batching None (stream,pos) + return data } + // TODO fix algorithm so we don't need to do this + match direction with + | Direction.Backward -> return data |> Seq.skipWhile (fun e -> e.Index > (snd streamPos).index) |> Array.ofSeq + | Direction.Forward -> return data |> Seq.skipWhile (fun e -> e.Index < (snd streamPos).index) |> Array.ofSeq } + + /// Establishes the current position of the stream in as effficient a manner as possible + /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) + member __.Sync(stream, ?position: Position) : Async = async { + //let indexed predicate = load fold initial (coll.Gateway.IndexedOrBatched log predicate (stream,None)) + let! (Token.Unpack { pos = pos' }) = gateway.GetPosition(logger, stream, ?pos=position) + return pos' } + + /// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk way from a running query + member __.Walk(stream, position, batchSize, ?direction) : AsyncSeq = asyncSeq { + let! res = __.GetInternal((stream, position), batchSize, ?direction=direction) + // TODO add laziness + return AsyncSeq.ofSeq res } + + /// Reads all Events from a `Position` in a given `direction`, ideally + member __.Read(stream, position, ?batchSize, ?direction) : Async = + __.GetInternal((stream, position), ?batchSize = batchSize, ?direction=direction) + + /// Appends the supplied batch of events, subject to a consistency check based on the `position` + /// Callers should implement appropriate idempotent handling, or use Equinox.Handler for that purpose + member __.Sync(stream, position, events: IEvent[]) : Async> = async { + let token = Token.ofNonCompacting (stream,position) + let batch = Sync.mkBatch stream events Seq.empty + let! res = gateway.TrySync logger token None (Some position.index,batch) + match res with + | InternalSyncResult.Written (Token.Unpack token) -> return AppendResult.Ok token.pos + | InternalSyncResult.Conflict (Token.Unpack token,events) -> return AppendResult.Conflict (token.pos, events) + | InternalSyncResult.ConflictUnknown (Token.Unpack token) -> return AppendResult.ConflictUnknown token.pos } + + /// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play + /// NB Should be used sparingly; Equinox.Handler enables building equivalent equivalent idempotent handling with minimal code. + member __.NonIdempotentAppend(stream, events: IEvent[]) : Async = async { + let! res = __.Sync(stream, Position.FromAppendAtEnd, events) + match res with + | AppendResult.Ok token -> return token + | x -> return x |> sprintf "Conflict despite it being disabled %A" |> invalidOp } + +[] +/// Api as defined in the Equinox Specification +/// Note the EqxContext APIs can yield better performance due to the fact that a Position tracks the etag of the Stream's WipBatch +module Events = + let private (|PositionIndex|) (x: Position) = x.index + let private stripSyncResult (f: Async>): Async> = async { + let! res = f + match res with + | AppendResult.Ok (PositionIndex index)-> return AppendResult.Ok index + | AppendResult.Conflict (PositionIndex index,events) -> return AppendResult.Conflict (index, events) + | AppendResult.ConflictUnknown (PositionIndex index) -> return AppendResult.ConflictUnknown index } + let private stripPosition (f: Async): Async = async { + let! (PositionIndex index) = f + return index } /// Returns an async sequence of events in the stream starting at the specified sequence number, /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let getAll (ctx:Context) (streamName: string) (index: int64) (batchSize:int) : AsyncSeq = - ctx.GetAll(streamName, index, batchSize) + let getAll (ctx: EqxContext) (streamName: string) (index: int64) (batchSize: int): AsyncSeq = + ctx.Walk(ctx.CreateStream streamName, Position.FromIndexOnly index, batchSize) /// Returns an async array of events in the stream starting at the specified sequence number, /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let get (ctx:Context) (streamName: string) (index: int64) (batchSize:int) = - ctx.Get(streamName, index, batchSize) + let get (ctx: EqxContext) (streamName: string) (index: int64) (batchSize: int): Async = + ctx.Read(ctx.CreateStream streamName, Position.FromIndexOnly index, batchSize) /// Appends a batch of events to a stream at the specified expected sequence number. /// If the specified expected sequence number does not match the stream, the events are not appended /// and a failure is returned. - let append (ctx:Context) (streamName: string) (index: int64) (eds:Store.IEvent[]) = - ctx.Append(streamName, eds, Some index) + let append (ctx: EqxContext) (streamName: string) (index: int64) (events: IEvent[]): Async> = + ctx.Sync(ctx.CreateStream streamName, Position.FromIndexOnly index, events) |> stripSyncResult - let appendAtEnd (ctx:Context) (streamName: string) (eds:Store.IEvent[]) = - ctx.Append(streamName, eds, None) + /// Appends a batch of events to a stream at the the present Position without any conflict checks. + /// NB typically, it is recommended to ensure idempotency of operations by using the `append` and related API as + /// this facilitates ensuring consistency is maintained, and yields reduced latency and Request Charges impacts + /// (See equivalent APIs on `Context` that yield `Position` values) + let appendAtEnd (ctx: EqxContext) (streamName: string) (events: IEvent[]): Async = + ctx.NonIdempotentAppend(ctx.CreateStream streamName, events) |> stripPosition /// Returns an async sequence of events in the stream backwards starting from the specified sequence number, /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getAllBackwards (conn:EqxConnection) (streamName: string) (index: int64) (batchSize:int): AsyncSeq = - failwith "TODO" + let getAllBackwards (ctx: EqxContext) (streamName: string) (index: int64) (batchSize: int): AsyncSeq = + ctx.Walk(ctx.CreateStream streamName, Position.FromIndexOnly index, batchSize, Direction.Backward) /// Returns an async array of events in the stream backwards starting from the specified sequence number, /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getBackwards (ctx:Context) (streamName: string) (index: int64) (batchSize:int) = - ctx.Get(streamName, index, batchSize, direction=Direction.Backward) + let getBackwards (ctx: EqxContext) (streamName: string) (index: int64) (batchSize: int): Async = + ctx.Read(ctx.CreateStream streamName, Position.FromIndexOnly index, batchSize, Direction.Backward) - /// Returns the next index to which events will be appended (0 if stream doesn't exist) - let getNextIndex (ctx:Context) (streamName: string) : Async = - failwith "TODO" \ No newline at end of file + /// Obtains the `index` from the current write Position + let getNextIndex (ctx: EqxContext) (streamName: string) : Async = + ctx.Sync(ctx.CreateStream streamName) |> stripPosition \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosEventsIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs similarity index 67% rename from tests/Equinox.Cosmos.Integration/CosmosEventsIntegration.fs rename to tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 336a1d289..7132ad1a4 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosEventsIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -1,7 +1,8 @@ -module Equinox.Cosmos.Integration.CosmosEventsIntegration +module Equinox.Cosmos.Integration.CoreIntegration open Equinox.Cosmos.Integration.Infrastructure open Equinox.Cosmos +open Equinox.Cosmos.Core open FSharp.Control open Newtonsoft.Json.Linq open Swensen.Unquote @@ -15,11 +16,11 @@ module Null = let defaultValue d x = if x = null then d else x type EventData = { eventType:string; data: byte[] } with - interface Store.IEvent with + interface Events.IEvent with member __.EventType = __.eventType member __.Data = __.data member __.Meta = Encoding.UTF8.GetBytes("{\"m\":\"m\"}") - static member Create(eventType,?json) : Store.IEvent = + static member Create(eventType,?json) : Events.IEvent = { eventType = eventType data = System.Text.Encoding.UTF8.GetBytes(defaultArg json "{\"d\":\"d\"}") } :> _ @@ -32,13 +33,16 @@ type Tests(testOutputHelper) = let (|TestStream|) (name:Guid) = incr testIterations sprintf "events-%O-%i" name !testIterations - let (|TestDbCollStream|) (TestStream streamName) = let (StoreCollection (dbId,collId,streamName)) = streamName in dbId,collId,streamName - let mkContextWithSliceLimit conn dbId collId maxEventsPerSlice = Events.Context(conn,dbId,collId,defaultBatchSize,log,?maxEventsPerSlice=maxEventsPerSlice) + let (|TestDbCollStream|) (TestStream streamName) = + let (StoreCollection (dbId,collId,streamName)) = streamName + dbId,collId,streamName + let mkContextWithSliceLimit conn dbId collId maxEventsPerSlice = + EqxContext(conn,dbId,collId,log,defaultMaxSlices=defaultBatchSize,?maxEventsPerSlice=maxEventsPerSlice) let mkContext conn dbId collId = mkContextWithSliceLimit conn dbId collId None - let verifyRequestChargesBelow rus = + let verifyRequestChargesMax rus = let tripRequestCharges = [ for e, c in capture.RequestCharges -> sprintf "%A" e, c ] - test <@ float rus > Seq.sum (Seq.map snd tripRequestCharges) @> + test <@ float rus >= Seq.sum (Seq.map snd tripRequestCharges) @> [] let append (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { @@ -48,17 +52,17 @@ type Tests(testOutputHelper) = let event = EventData.Create("test_event") let index = 0L let! res = Events.append ctx streamName index [|event|] - test <@ Events.AppendResult.Ok 1L = res @> + test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesBelow 10 + verifyRequestChargesMax 10 // Clear the counters capture.Clear() let! res = Events.append ctx streamName 1L (Array.replicate 5 event) - test <@ Events.AppendResult.Ok 6L = res @> + test <@ AppendResult.Ok 6L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> // We didnt request small batches or splitting so it's not dramatically more expensive to write N events - verifyRequestChargesBelow 11 + verifyRequestChargesMax 11 } let blobEquals (x: byte[]) (y: byte[]) = System.Linq.Enumerable.SequenceEqual(x,y) @@ -75,46 +79,80 @@ type Tests(testOutputHelper) = let index = 0L let event = EventData.Create("test_event") let! res = Events.append ctx streamName index [|event|] - test <@ Events.AppendResult.Ok 1L = res @> + test <@ AppendResult.Ok 1L = res @> let! res = Events.append ctx streamName 1L (Array.replicate 5 event) - test <@ Events.AppendResult.Ok 6L = res @> + test <@ AppendResult.Ok 6L = res @> // Only start counting RUs from here capture.Clear() return Array.replicate 6 event } - let verifyCorrectEventsEx direction baseIndex (expected: Store.IEvent []) (res: Store.IOrderedEvent[]) = + let verifyCorrectEventsEx direction baseIndex (expected: Events.IEvent []) (res: Events.IOrderedEvent[]) = test <@ expected.Length = res.Length @> match direction with - | Store.Direction.Forward -> test <@ [for i in 0..expected.Length - 1 -> baseIndex + int64 i] = [ for r in res -> r.Index ] @> - | Store.Direction.Backward -> test <@ [for i in 0..expected.Length-1 -> baseIndex - int64 i] = [ for r in res -> r.Index ] @> + | Direction.Forward -> test <@ [for i in 0..expected.Length - 1 -> baseIndex + int64 i] = [ for r in res -> r.Index ] @> + | Direction.Backward -> test <@ [for i in 0..expected.Length-1 -> baseIndex - int64 i] = [ for r in res -> r.Index ] @> test <@ [for e in expected -> e.EventType] = [ for r in res -> r.EventType ] @> for i,x,y in Seq.mapi2 (fun i x y -> i,x,y) [for e in expected -> e.Data] [ for r in res -> r.Data ] do verifyUtf8JsonEquals i x y - let verifyCorrectEventsBackward = verifyCorrectEventsEx Store.Direction.Backward - let verifyCorrectEvents = verifyCorrectEventsEx Store.Direction.Forward + let verifyCorrectEventsBackward = verifyCorrectEventsEx Direction.Backward + let verifyCorrectEvents = verifyCorrectEventsEx Direction.Forward [] - let appendAtEnd (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { + let ``appendAtEnd and getNextIndex`` (extras, TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log let ctx = mkContextWithSliceLimit conn dbId collId (Some 1) + // If a fail triggers a rerun, we need to dump the previous log entries captured + capture.Clear() + let! pos = Events.getNextIndex ctx streamName + test <@ [EqxAct.IndexedNotFound] = capture.ExternalCalls @> + 0L =! pos + verifyRequestChargesMax 1 // for a 404 by definition + capture.Clear() + let event = EventData.Create("test_event") let mutable pos = 0L for size in [4; 5; 9] do let! res = Events.appendAtEnd ctx streamName (Array.replicate size event) test <@ [EqxAct.Append] = capture.ExternalCalls @> pos <- pos + int64 size - Events.AppendResult.Ok pos =! res - verifyRequestChargesBelow 15 + pos =! res + verifyRequestChargesMax 20 // 15.59 observed capture.Clear() - let! res = Events.append ctx streamName pos (Array.replicate 42 event) + let! res = Events.appendAtEnd ctx streamName (Array.replicate 42 event) pos <- pos + 42L + pos =! res test <@ [EqxAct.Append] = capture.ExternalCalls @> - Events.AppendResult.Ok pos =! res - verifyRequestChargesBelow 20 + verifyRequestChargesMax 20 + capture.Clear() + + let! res = Events.getNextIndex ctx streamName + test <@ [EqxAct.Indexed] = capture.ExternalCalls @> + verifyRequestChargesMax 2 + capture.Clear() + pos =! res + + // Demonstrate benefit/mechanism for using the Position-based API to avail of the etag tracking + let stream = ctx.CreateStream streamName + + let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 + let! _pos = ctx.NonIdempotentAppend(stream, Array.replicate extrasCount event) + test <@ [EqxAct.Append] = capture.ExternalCalls @> + verifyRequestChargesMax 300 // 278 observed + capture.Clear() + + let! pos = ctx.Sync(stream,?position=None) + test <@ [EqxAct.Indexed] = capture.ExternalCalls @> + verifyRequestChargesMax 50 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) + capture.Clear() + + let! _pos = ctx.Sync(stream,pos) + test <@ [EqxAct.IndexedCached] = capture.ExternalCalls @> + verifyRequestChargesMax 1 // for a 302 by definition - when an etag IfNotMatch is honored, you only pay one RU + capture.Clear() } [] @@ -127,26 +165,26 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 1L [|event|] test <@ [EqxAct.Resync] = capture.ExternalCalls @> // The response aligns with a normal conflict in that it passes the entire set of conflicting events () - test <@ Events.AppendResult.Conflict (0L,[||]) = res @> - verifyRequestChargesBelow 5 + test <@ AppendResult.Conflict (0L,[||]) = res @> + verifyRequestChargesMax 5 capture.Clear() // Now write at the correct position let expected = [|event|] let! res = Events.append ctx streamName 0L expected - test <@ Events.AppendResult.Ok 1L = res @> + test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesBelow 10 + verifyRequestChargesMax 10 capture.Clear() // Try overwriting it (a competing consumer would see the same) let! res = Events.append ctx streamName 0L [|event; event|] - // This time we get passed the conflicting events + // This time we get passed the conflicting events - we pay a little for that, but that's unavoidable match res with - | Events.AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e + | AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e | x -> x |> failwithf "Unexpected %A" test <@ [EqxAct.Resync] = capture.ExternalCalls @> - verifyRequestChargesBelow 4 + verifyRequestChargesMax 4 capture.Clear() } @@ -164,7 +202,7 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res test <@ [EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @> - verifyRequestChargesBelow 3 + verifyRequestChargesMax 3 } [] @@ -182,7 +220,7 @@ type Tests(testOutputHelper) = verifyCorrectEventsBackward 4L expected res test <@ [EqxAct.SliceBackward; EqxAct.BatchBackward] = capture.ExternalCalls @> - verifyRequestChargesBelow 3 + verifyRequestChargesMax 3 } // TODO AsyncSeq version @@ -203,7 +241,7 @@ type Tests(testOutputHelper) = // 2 Slices this time test <@ [EqxAct.SliceForward; EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @> - verifyRequestChargesBelow 6 + verifyRequestChargesMax 6 } [] @@ -220,9 +258,7 @@ type Tests(testOutputHelper) = // TODO [implement and] prove laziness test <@ [EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @> - verifyRequestChargesBelow 3 + verifyRequestChargesMax 3 } - // TODO getNextIndex test - - // TODO mine other integration tests + // TODO mine other integration tests \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs index 1e978cb57..6c6858bdd 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs @@ -1,7 +1,7 @@ [] module Equinox.Cosmos.Integration.CosmosFixtures -open Equinox.Cosmos +open Equinox.Cosmos.Builder open System module Option = @@ -31,4 +31,4 @@ let (|StoreCollection|) streamName = databaseId, collectionId, streamName let defaultBatchSize = 500 -let createEqxGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(getMaxBatchSize = (fun () -> batchSize), maxEventsPerSlice=10)) \ No newline at end of file +let createEqxGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(defaultMaxSlices=batchSize, maxEventsPerSlice=10)) \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs index 081589a04..862551cc1 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs @@ -50,32 +50,33 @@ module SerilogHelpers = let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function | (:? ScalarValue as x) -> Some x.Value | _ -> None + open Equinox.Cosmos [] type EqxAct = Append | Resync | Conflict | SliceForward | SliceBackward | BatchForward | BatchBackward | Indexed | IndexedNotFound | IndexedCached let (|EqxAction|) (evt : Equinox.Cosmos.Log.Event) = match evt with - | Equinox.Cosmos.Log.WriteSuccess _ -> EqxAct.Append - | Equinox.Cosmos.Log.WriteResync _ -> EqxAct.Resync - | Equinox.Cosmos.Log.WriteConflict _ -> EqxAct.Conflict - | Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Store.Direction.Forward,_) -> EqxAct.SliceForward - | Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Store.Direction.Backward,_) -> EqxAct.SliceBackward - | Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Store.Direction.Forward,_,_) -> EqxAct.BatchForward - | Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Store.Direction.Backward,_,_) -> EqxAct.BatchBackward - | Equinox.Cosmos.Log.Index _ -> EqxAct.Indexed - | Equinox.Cosmos.Log.IndexNotFound _ -> EqxAct.IndexedNotFound - | Equinox.Cosmos.Log.IndexNotModified _ -> EqxAct.IndexedCached + | Log.WriteSuccess _ -> EqxAct.Append + | Log.WriteResync _ -> EqxAct.Resync + | Log.WriteConflict _ -> EqxAct.Conflict + | Log.Slice (Direction.Forward,_) -> EqxAct.SliceForward + | Log.Slice (Direction.Backward,_) -> EqxAct.SliceBackward + | Log.Batch (Direction.Forward,_,_) -> EqxAct.BatchForward + | Log.Batch (Direction.Backward,_,_) -> EqxAct.BatchBackward + | Log.Index _ -> EqxAct.Indexed + | Log.IndexNotFound _ -> EqxAct.IndexedNotFound + | Log.IndexNotModified _ -> EqxAct.IndexedCached let inline (|Stats|) ({ ru = ru }: Equinox.Cosmos.Log.Measurement) = ru let (|CosmosReadRu|CosmosWriteRu|CosmosResyncRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) = match evt with - | Equinox.Cosmos.Log.Index (Stats s) - | Equinox.Cosmos.Log.IndexNotFound (Stats s) - | Equinox.Cosmos.Log.IndexNotModified (Stats s) - | Equinox.Cosmos.Log.Batch (_,_, (Stats s)) -> CosmosReadRu s - | Equinox.Cosmos.Log.WriteSuccess (Stats s) - | Equinox.Cosmos.Log.WriteConflict (Stats s) -> CosmosWriteRu s - | Equinox.Cosmos.Log.WriteResync (Stats s) -> CosmosResyncRu s + | Log.Index (Stats s) + | Log.IndexNotFound (Stats s) + | Log.IndexNotModified (Stats s) + | Log.Batch (_,_, (Stats s)) -> CosmosReadRu s + | Log.WriteSuccess (Stats s) + | Log.WriteConflict (Stats s) -> CosmosWriteRu s + | Log.WriteResync (Stats s) -> CosmosResyncRu s // slices are rolled up into batches so be sure not to double-count - | Equinox.Cosmos.Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru + | Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru /// Facilitates splitting between events with direct charges vs synthetic events Equinox generates to avoid double counting let (|CosmosRequestCharge|EquinoxChargeRollup|) c = match c with diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 7338443b2..7e57439f0 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -1,7 +1,8 @@ module Equinox.Cosmos.Integration.CosmosIntegration +open Equinox open Equinox.Cosmos.Integration.Infrastructure -open Equinox.Cosmos +open Equinox.Cosmos.Builder open Swensen.Unquote open System.Threading open System @@ -21,7 +22,7 @@ module Cart = let createServiceWithCompaction connection batchSize log = let gateway = createEqxGateway connection batchSize let resolveStream (StoreCollection args) = - EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact).Create(args) + EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.RollingSnapshots compact).Create(args) Backend.Cart.Service(log, resolveStream) let createServiceWithCaching connection batchSize log cache = let gateway = createEqxGateway connection batchSize @@ -30,17 +31,17 @@ module Cart = Backend.Cart.Service(log, resolveStream) let createServiceIndexed connection batchSize log = let gateway = createEqxGateway connection batchSize - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.IndexedSearch index).Create(args) + let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.IndexedSearch index).Create(args) Backend.Cart.Service(log, resolveStream) let createServiceWithCachingIndexed connection batchSize log cache = let gateway = createEqxGateway connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.IndexedSearch index, caching=sliding20m).Create(args) + let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.IndexedSearch index, caching=sliding20m).Create(args) Backend.Cart.Service(log, resolveStream) let createServiceWithCompactionAndCaching connection batchSize log cache = let gateway = createEqxGateway connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact, sliding20m).Create(args) + let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial, Cosmos.AccessStrategy.RollingSnapshots compact, sliding20m).Create(args) Backend.Cart.Service(log, resolveStream) module ContactPreferences = @@ -51,7 +52,7 @@ module ContactPreferences = let resolveStream (StoreCollection args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args) Backend.ContactPreferences.Service(log, resolveStream) let createService createGateway log = - let resolveStream (StoreCollection args) = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(args) + let resolveStream (StoreCollection args) = EqxStreamBuilder(createGateway 1, codec, fold, initial, Cosmos.AccessStrategy.EventsAreState).Create(args) Backend.ContactPreferences.Service(log, resolveStream) #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) diff --git a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj index 9f9a00609..df3ea5b58 100644 --- a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj +++ b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj @@ -12,7 +12,7 @@ - +