Skip to content

Commit

Permalink
Cosmos core events API (#49), remove rolling snapshots
Browse files Browse the repository at this point in the history
* Reorganize, adding Batch structure
* Rework stored procedure
* Remove rolling snapshots
* Add explicit non-indexed mode
  • Loading branch information
bartelink committed Dec 19, 2018
1 parent 360cc82 commit 2fb63f9
Show file tree
Hide file tree
Showing 15 changed files with 1,496 additions and 936 deletions.
2 changes: 1 addition & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Folds =
let toSnapshot (s: State) : Events.Compaction.State =
{ items = [| for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } |] }
let ofCompacted (s: Events.Compaction.State) : State =
{ items = [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
{ items = if s.items = null then [] else [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
let initial = { items = [] }
let evolve (state : State) event =
let updateItems f = { state with items = f state.items }
Expand Down
4 changes: 3 additions & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Events =
type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool }
type Value = { email : string; preferences : Preferences }

let [<Literal>] EventTypeName = "contactPreferencesChanged"
type Event =
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
| [<System.Runtime.Serialization.DataMember(Name = EventTypeName)>]Updated of Value
interface TypeShape.UnionContract.IUnionContract
let eventTypeNames = System.Collections.Generic.HashSet<string>([EventTypeName])

module Folds =
type State = Events.Preferences
Expand Down
18 changes: 9 additions & 9 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.CartIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -22,10 +22,10 @@ let resolveGesStreamWithRollingSnapshots gateway =
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesResolver(gateway, codec, fold, initial).Resolve

let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveEqxStreamWithProjection gateway =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection snapshot).Create
let resolveEqxStreamWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -71,13 +71,13 @@ type Tests(testOutputHelper) =
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithoutCompactionSemantics
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithCompactionEventType
let ``Can roundtrip against Cosmos, correctly folding the events with With Projection`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithProjection
do! act service args
}
14 changes: 7 additions & 7 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.ContactPreferencesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -21,10 +21,10 @@ let resolveStreamGesWithOptimizedStorageSemantics gateway =
let resolveStreamGesWithoutAccessStrategy gateway =
GesResolver(gateway defaultBatchSize, codec, fold, initial).Resolve

let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create
let resolveStreamEqxWithoutCompactionSemantics gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -63,12 +63,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics
do! act service args
}
6 changes: 3 additions & 3 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.FavoritesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -21,7 +21,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceEqx gateway log =
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down Expand Up @@ -58,7 +58,7 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createEqxGateway conn defaultBatchSize
let gateway = createEqxStore conn defaultBatchSize
let service = createServiceEqx gateway log
do! act service args
}
18 changes: 10 additions & 8 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ module EquinoxCosmosInterop =
let action, metric, batches, ru =
match evt with
| Log.WriteSuccess m -> "EqxAppendToStreamAsync", m, None, m.ru
| Log.WriteConflict m -> "EqxAppendToStreamAsync", m, None, m.ru
| Log.WriteConflict m -> "EqxAppendToStreamConflictAsync", m, None, m.ru
| Log.WriteResync m -> "EqxAppendToStreamResyncAsync", m, None, m.ru
| Log.Slice (Direction.Forward,m) -> "EqxReadStreamEventsForwardAsync", m, None, m.ru
| Log.Slice (Direction.Backward,m) -> "EqxReadStreamEventsBackwardAsync", m, None, m.ru
| Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru
Expand Down Expand Up @@ -117,13 +118,14 @@ type Tests() =
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, hooking, extracting and substituting metrics in the logging information`` context cartId skuId = Async.RunSynchronously <| async {
let buffer = ResizeArray<string>()
let ``Can roundtrip against Cosmos, hooking, extracting and substituting metrics in the logging information`` context skuId = Async.RunSynchronously <| async {
let batchSize = defaultBatchSize
let (log,capture) = createLoggerWithMetricsExtraction buffer.Add
let buffer = ConcurrentQueue<string>()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createEqxGateway conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithCompactionEventType gateway)
let itemCount, cartId = batchSize / 2 + 1, cartId ()
do! act buffer capture service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration"
let gateway = createEqxStore conn batchSize
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithProjection gateway)
let itemCount = batchSize / 2 + 1
let cartId = Guid.NewGuid() |> CartId
do! act buffer service itemCount context cartId skuId "Eqx Index " // one is a 404, one is a 200
}
105 changes: 105 additions & 0 deletions src/Equinox.Cosmos/Backoff.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
namespace Equinox.Cosmos

// NB this is a copy of the one in Backend - there is also one in Equinox/Infrastrcture.fs which this will be merged into

open System

/// Given a value, creates a function with one ignored argument which returns the value.
/// A backoff strategy.
/// Accepts the attempt number and returns an interval in milliseconds to wait.
/// If None then backoff should stop.
type Backoff = int -> int option

/// Operations on back off strategies represented as functions (int -> int option)
/// which take an attempt number and produce an interval.
module Backoff =

let inline konst x _ = x
let private checkOverflow x =
if x = System.Int32.MinValue then 2000000000
else x

/// Stops immediately.
let never : Backoff = konst None

/// Always returns a fixed interval.
let linear i : Backoff = konst (Some i)

/// Modifies the interval.
let bind (f:int -> int option) (b:Backoff) =
fun i ->
match b i with
| Some x -> f x
| None -> None

/// Modifies the interval.
let map (f:int -> int) (b:Backoff) : Backoff =
fun i ->
match b i with
| Some x -> f x |> checkOverflow |> Some
| None -> None

/// Bounds the interval.
let bound mx = map (min mx)

/// Creates a back-off strategy which increases the interval exponentially.
let exp (initialIntervalMs:int) (multiplier:float) : Backoff =
fun i -> (float initialIntervalMs) * (pown multiplier i) |> int |> checkOverflow |> Some

/// Randomizes the output produced by a back-off strategy:
/// randomizedInterval = retryInterval * (random in range [1 - randomizationFactor, 1 + randomizationFactor])
let rand (randomizationFactor:float) =
let rand = new System.Random()
let maxRand,minRand = (1.0 + randomizationFactor), (1.0 - randomizationFactor)
map (fun x -> (float x) * (rand.NextDouble() * (maxRand - minRand) + minRand) |> int)

/// Uses a fibonacci sequence to genereate timeout intervals starting from the specified initial interval.
let fib (initialIntervalMs:int) : Backoff =
let rec fib n =
if n < 2 then initialIntervalMs
else fib (n - 1) + fib (n - 2)
fib >> checkOverflow >> Some

/// Creates a stateful back-off strategy which keeps track of the number of attempts,
/// and a reset function which resets attempts to zero.
let keepCount (b:Backoff) : (unit -> int option) * (unit -> unit) =
let i = ref -1
(fun () -> System.Threading.Interlocked.Increment i |> b),
(fun () -> i := -1)

/// Bounds a backoff strategy to a specified maximum number of attempts.
let maxAttempts (max:int) (b:Backoff) : Backoff =
fun n -> if n > max then None else b n


// ------------------------------------------------------------------------------------------------------------------------
// defaults

/// 500ms
let [<Literal>] DefaultInitialIntervalMs = 500

/// 60000ms
let [<Literal>] DefaultMaxIntervalMs = 60000

/// 0.5
let [<Literal>] DefaultRandomizationFactor = 0.5

/// 1.5
let [<Literal>] DefaultMultiplier = 1.5

/// The default exponential and randomized back-off strategy with a provided initial interval.
/// DefaultMaxIntervalMs = 60,000
/// DefaultRandomizationFactor = 0.5
/// DefaultMultiplier = 1.5
let DefaultExponentialBoundedRandomizedOf initialInternal =
exp initialInternal DefaultMultiplier
|> rand DefaultRandomizationFactor
|> bound DefaultMaxIntervalMs

/// The default exponential and randomized back-off strategy.
/// DefaultInitialIntervalMs = 500
/// DefaultMaxIntervalMs = 60,000
/// DefaultRandomizationFactor = 0.5
/// DefaultMultiplier = 1.5
let DefaultExponentialBoundedRandomized = DefaultExponentialBoundedRandomizedOf DefaultInitialIntervalMs
Loading

0 comments on commit 2fb63f9

Please sign in to comment.