Skip to content

Commit

Permalink
Add getNextIndex, flesh out and namespace APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 20, 2018
1 parent 01a9f65 commit da5d45f
Show file tree
Hide file tree
Showing 11 changed files with 718 additions and 569 deletions.
10 changes: 5 additions & 5 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

open Argu
open Domain
open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.EventStore
open Infrastructure
open Serilog
Expand Down Expand Up @@ -121,7 +121,7 @@ module Cosmos =
let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let createGateway connection (batchSize,pageSize) = EqxGateway(connection, EqxBatchingPolicy(getMaxBatchSize = (fun () -> batchSize), maxEventsPerSlice = pageSize))
let createGateway connection (maxBatches,maxEvents) = EqxGateway(connection, EqxBatchingPolicy(defaultMaxSlices=maxBatches, maxEventsPerSlice = maxEvents))

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Store =
Expand All @@ -144,8 +144,8 @@ module Test =
let createFavoritesService store (targs: ParseResults<TestArguments>) log =
let cache =
if targs.Contains Cached then
let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
match store with
Expand Down Expand Up @@ -315,7 +315,7 @@ let main argv =
| Some (Provision args) ->
let rus = args.GetResult(Rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus)
Equinox.Cosmos.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn (defaultBatchSize,pageSize), dbName, collName)
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.CartIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.ContactPreferencesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.FavoritesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand Down
1 change: 0 additions & 1 deletion samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ module EquinoxEsInterop =
{ action = action; stream = metric.stream; interval = metric.interval; bytes = metric.bytes; count = metric.count; batches = batches }
module EquinoxCosmosInterop =
open Equinox.Cosmos
open Equinox.Cosmos.Store
[<NoEquality; NoComparison>]
type FlatMetric = { action: string; stream: string; interval: StopwatchInterval; bytes: int; count: int; batches: int option; ru: float } with
override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru
Expand Down
1,102 changes: 607 additions & 495 deletions src/Equinox.Cosmos/Cosmos.fs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Equinox.Cosmos.Integration.CosmosEventsIntegration
module Equinox.Cosmos.Integration.CoreIntegration

open Equinox.Cosmos.Integration.Infrastructure
open Equinox.Cosmos
open Equinox.Cosmos.Core
open FSharp.Control
open Newtonsoft.Json.Linq
open Swensen.Unquote
Expand All @@ -15,11 +16,11 @@ module Null =
let defaultValue d x = if x = null then d else x

type EventData = { eventType:string; data: byte[] } with
interface Store.IEvent with
interface Events.IEvent with
member __.EventType = __.eventType
member __.Data = __.data
member __.Meta = Encoding.UTF8.GetBytes("{\"m\":\"m\"}")
static member Create(eventType,?json) : Store.IEvent =
static member Create(eventType,?json) : Events.IEvent =
{ eventType = eventType
data = System.Text.Encoding.UTF8.GetBytes(defaultArg json "{\"d\":\"d\"}") } :> _

Expand All @@ -32,13 +33,16 @@ type Tests(testOutputHelper) =
let (|TestStream|) (name:Guid) =
incr testIterations
sprintf "events-%O-%i" name !testIterations
let (|TestDbCollStream|) (TestStream streamName) = let (StoreCollection (dbId,collId,streamName)) = streamName in dbId,collId,streamName
let mkContextWithSliceLimit conn dbId collId maxEventsPerSlice = Events.Context(conn,dbId,collId,defaultBatchSize,log,?maxEventsPerSlice=maxEventsPerSlice)
let (|TestDbCollStream|) (TestStream streamName) =
let (StoreCollection (dbId,collId,streamName)) = streamName
dbId,collId,streamName
let mkContextWithSliceLimit conn dbId collId maxEventsPerSlice =
EqxContext(conn,dbId,collId,log,defaultMaxSlices=defaultBatchSize,?maxEventsPerSlice=maxEventsPerSlice)
let mkContext conn dbId collId = mkContextWithSliceLimit conn dbId collId None

let verifyRequestChargesBelow rus =
let verifyRequestChargesMax rus =
let tripRequestCharges = [ for e, c in capture.RequestCharges -> sprintf "%A" e, c ]
test <@ float rus > Seq.sum (Seq.map snd tripRequestCharges) @>
test <@ float rus >= Seq.sum (Seq.map snd tripRequestCharges) @>

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let append (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async {
Expand All @@ -48,17 +52,17 @@ type Tests(testOutputHelper) =
let event = EventData.Create("test_event")
let index = 0L
let! res = Events.append ctx streamName index [|event|]
test <@ Events.AppendResult.Ok 1L = res @>
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesBelow 10
verifyRequestChargesMax 10
// Clear the counters
capture.Clear()

let! res = Events.append ctx streamName 1L (Array.replicate 5 event)
test <@ Events.AppendResult.Ok 6L = res @>
test <@ AppendResult.Ok 6L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
// We didnt request small batches or splitting so it's not dramatically more expensive to write N events
verifyRequestChargesBelow 11
verifyRequestChargesMax 11
}

let blobEquals (x: byte[]) (y: byte[]) = System.Linq.Enumerable.SequenceEqual(x,y)
Expand All @@ -75,46 +79,80 @@ type Tests(testOutputHelper) =
let index = 0L
let event = EventData.Create("test_event")
let! res = Events.append ctx streamName index [|event|]
test <@ Events.AppendResult.Ok 1L = res @>
test <@ AppendResult.Ok 1L = res @>
let! res = Events.append ctx streamName 1L (Array.replicate 5 event)
test <@ Events.AppendResult.Ok 6L = res @>
test <@ AppendResult.Ok 6L = res @>
// Only start counting RUs from here
capture.Clear()
return Array.replicate 6 event
}

let verifyCorrectEventsEx direction baseIndex (expected: Store.IEvent []) (res: Store.IOrderedEvent[]) =
let verifyCorrectEventsEx direction baseIndex (expected: Events.IEvent []) (res: Events.IOrderedEvent[]) =
test <@ expected.Length = res.Length @>
match direction with
| Store.Direction.Forward -> test <@ [for i in 0..expected.Length - 1 -> baseIndex + int64 i] = [ for r in res -> r.Index ] @>
| Store.Direction.Backward -> test <@ [for i in 0..expected.Length-1 -> baseIndex - int64 i] = [ for r in res -> r.Index ] @>
| Direction.Forward -> test <@ [for i in 0..expected.Length - 1 -> baseIndex + int64 i] = [ for r in res -> r.Index ] @>
| Direction.Backward -> test <@ [for i in 0..expected.Length-1 -> baseIndex - int64 i] = [ for r in res -> r.Index ] @>

test <@ [for e in expected -> e.EventType] = [ for r in res -> r.EventType ] @>
for i,x,y in Seq.mapi2 (fun i x y -> i,x,y) [for e in expected -> e.Data] [ for r in res -> r.Data ] do
verifyUtf8JsonEquals i x y
let verifyCorrectEventsBackward = verifyCorrectEventsEx Store.Direction.Backward
let verifyCorrectEvents = verifyCorrectEventsEx Store.Direction.Forward
let verifyCorrectEventsBackward = verifyCorrectEventsEx Direction.Backward
let verifyCorrectEvents = verifyCorrectEventsEx Direction.Forward

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let appendAtEnd (TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async {
let ``appendAtEnd and getNextIndex`` (extras, TestDbCollStream (dbId,collId,streamName)) = Async.RunSynchronously <| async {
let! conn = connectToSpecifiedCosmosOrSimulator log
let ctx = mkContextWithSliceLimit conn dbId collId (Some 1)

// If a fail triggers a rerun, we need to dump the previous log entries captured
capture.Clear()
let! pos = Events.getNextIndex ctx streamName
test <@ [EqxAct.IndexedNotFound] = capture.ExternalCalls @>
0L =! pos
verifyRequestChargesMax 1 // for a 404 by definition
capture.Clear()

let event = EventData.Create("test_event")
let mutable pos = 0L
for size in [4; 5; 9] do
let! res = Events.appendAtEnd ctx streamName (Array.replicate size event)
test <@ [EqxAct.Append] = capture.ExternalCalls @>
pos <- pos + int64 size
Events.AppendResult.Ok pos =! res
verifyRequestChargesBelow 15
pos =! res
verifyRequestChargesMax 20 // 15.59 observed
capture.Clear()

let! res = Events.append ctx streamName pos (Array.replicate 42 event)
let! res = Events.appendAtEnd ctx streamName (Array.replicate 42 event)
pos <- pos + 42L
pos =! res
test <@ [EqxAct.Append] = capture.ExternalCalls @>
Events.AppendResult.Ok pos =! res
verifyRequestChargesBelow 20
verifyRequestChargesMax 20
capture.Clear()

let! res = Events.getNextIndex ctx streamName
test <@ [EqxAct.Indexed] = capture.ExternalCalls @>
verifyRequestChargesMax 2
capture.Clear()
pos =! res

// Demonstrate benefit/mechanism for using the Position-based API to avail of the etag tracking
let stream = ctx.CreateStream streamName

let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100
let! _pos = ctx.NonIdempotentAppend(stream, Array.replicate extrasCount event)
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 300 // 278 observed
capture.Clear()

let! pos = ctx.Sync(stream,?position=None)
test <@ [EqxAct.Indexed] = capture.ExternalCalls @>
verifyRequestChargesMax 50 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded)
capture.Clear()

let! _pos = ctx.Sync(stream,pos)
test <@ [EqxAct.IndexedCached] = capture.ExternalCalls @>
verifyRequestChargesMax 1 // for a 302 by definition - when an etag IfNotMatch is honored, you only pay one RU
capture.Clear()
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand All @@ -127,26 +165,26 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName 1L [|event|]
test <@ [EqxAct.Resync] = capture.ExternalCalls @>
// The response aligns with a normal conflict in that it passes the entire set of conflicting events ()
test <@ Events.AppendResult.Conflict (0L,[||]) = res @>
verifyRequestChargesBelow 5
test <@ AppendResult.Conflict (0L,[||]) = res @>
verifyRequestChargesMax 5
capture.Clear()

// Now write at the correct position
let expected = [|event|]
let! res = Events.append ctx streamName 0L expected
test <@ Events.AppendResult.Ok 1L = res @>
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesBelow 10
verifyRequestChargesMax 10
capture.Clear()

// Try overwriting it (a competing consumer would see the same)
let! res = Events.append ctx streamName 0L [|event; event|]
// This time we get passed the conflicting events
// This time we get passed the conflicting events - we pay a little for that, but that's unavoidable
match res with
| Events.AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e
| AppendResult.Conflict (1L, e) -> verifyCorrectEvents 0L expected e
| x -> x |> failwithf "Unexpected %A"
test <@ [EqxAct.Resync] = capture.ExternalCalls @>
verifyRequestChargesBelow 4
verifyRequestChargesMax 4
capture.Clear()
}

Expand All @@ -164,7 +202,7 @@ type Tests(testOutputHelper) =
verifyCorrectEvents 1L expected res

test <@ [EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @>
verifyRequestChargesBelow 3
verifyRequestChargesMax 3
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand All @@ -182,7 +220,7 @@ type Tests(testOutputHelper) =
verifyCorrectEventsBackward 4L expected res

test <@ [EqxAct.SliceBackward; EqxAct.BatchBackward] = capture.ExternalCalls @>
verifyRequestChargesBelow 3
verifyRequestChargesMax 3
}

// TODO AsyncSeq version
Expand All @@ -203,7 +241,7 @@ type Tests(testOutputHelper) =

// 2 Slices this time
test <@ [EqxAct.SliceForward; EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @>
verifyRequestChargesBelow 6
verifyRequestChargesMax 6
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand All @@ -220,9 +258,7 @@ type Tests(testOutputHelper) =

// TODO [implement and] prove laziness
test <@ [EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @>
verifyRequestChargesBelow 3
verifyRequestChargesMax 3
}

// TODO getNextIndex test

// TODO mine other integration tests
// TODO mine other integration tests
4 changes: 2 additions & 2 deletions tests/Equinox.Cosmos.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[<AutoOpen>]
module Equinox.Cosmos.Integration.CosmosFixtures

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open System

module Option =
Expand Down Expand Up @@ -31,4 +31,4 @@ let (|StoreCollection|) streamName =
databaseId, collectionId, streamName

let defaultBatchSize = 500
let createEqxGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(getMaxBatchSize = (fun () -> batchSize), maxEventsPerSlice=10))
let createEqxGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(defaultMaxSlices=batchSize, maxEventsPerSlice=10))
37 changes: 19 additions & 18 deletions tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,33 @@ module SerilogHelpers =
let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
open Equinox.Cosmos
[<RequireQualifiedAccess>]
type EqxAct = Append | Resync | Conflict | SliceForward | SliceBackward | BatchForward | BatchBackward | Indexed | IndexedNotFound | IndexedCached
let (|EqxAction|) (evt : Equinox.Cosmos.Log.Event) =
match evt with
| Equinox.Cosmos.Log.WriteSuccess _ -> EqxAct.Append
| Equinox.Cosmos.Log.WriteResync _ -> EqxAct.Resync
| Equinox.Cosmos.Log.WriteConflict _ -> EqxAct.Conflict
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Store.Direction.Forward,_) -> EqxAct.SliceForward
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Store.Direction.Backward,_) -> EqxAct.SliceBackward
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Store.Direction.Forward,_,_) -> EqxAct.BatchForward
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Store.Direction.Backward,_,_) -> EqxAct.BatchBackward
| Equinox.Cosmos.Log.Index _ -> EqxAct.Indexed
| Equinox.Cosmos.Log.IndexNotFound _ -> EqxAct.IndexedNotFound
| Equinox.Cosmos.Log.IndexNotModified _ -> EqxAct.IndexedCached
| Log.WriteSuccess _ -> EqxAct.Append
| Log.WriteResync _ -> EqxAct.Resync
| Log.WriteConflict _ -> EqxAct.Conflict
| Log.Slice (Direction.Forward,_) -> EqxAct.SliceForward
| Log.Slice (Direction.Backward,_) -> EqxAct.SliceBackward
| Log.Batch (Direction.Forward,_,_) -> EqxAct.BatchForward
| Log.Batch (Direction.Backward,_,_) -> EqxAct.BatchBackward
| Log.Index _ -> EqxAct.Indexed
| Log.IndexNotFound _ -> EqxAct.IndexedNotFound
| Log.IndexNotModified _ -> EqxAct.IndexedCached
let inline (|Stats|) ({ ru = ru }: Equinox.Cosmos.Log.Measurement) = ru
let (|CosmosReadRu|CosmosWriteRu|CosmosResyncRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
match evt with
| Equinox.Cosmos.Log.Index (Stats s)
| Equinox.Cosmos.Log.IndexNotFound (Stats s)
| Equinox.Cosmos.Log.IndexNotModified (Stats s)
| Equinox.Cosmos.Log.Batch (_,_, (Stats s)) -> CosmosReadRu s
| Equinox.Cosmos.Log.WriteSuccess (Stats s)
| Equinox.Cosmos.Log.WriteConflict (Stats s) -> CosmosWriteRu s
| Equinox.Cosmos.Log.WriteResync (Stats s) -> CosmosResyncRu s
| Log.Index (Stats s)
| Log.IndexNotFound (Stats s)
| Log.IndexNotModified (Stats s)
| Log.Batch (_,_, (Stats s)) -> CosmosReadRu s
| Log.WriteSuccess (Stats s)
| Log.WriteConflict (Stats s) -> CosmosWriteRu s
| Log.WriteResync (Stats s) -> CosmosResyncRu s
// slices are rolled up into batches so be sure not to double-count
| Equinox.Cosmos.Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru
| Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru
/// Facilitates splitting between events with direct charges vs synthetic events Equinox generates to avoid double counting
let (|CosmosRequestCharge|EquinoxChargeRollup|) c =
match c with
Expand Down
Loading

0 comments on commit da5d45f

Please sign in to comment.