From a976fc696a97c445922e6af971d71aafe6224711 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 8 Nov 2018 17:29:48 +0000 Subject: [PATCH] WIP --- src/Equinox.Cosmos/Cosmos.fs | 78 +++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index c18efa105..a8bf05c61 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -82,11 +82,17 @@ module Store = { p: string // "{streamName}" id: string // "{-1}" - w: int64 // 100: window size + //w: int64 // 100: window size /// last index/i value m: int64 // {index} - (* "x": [ + /// Compacted projections based on version identified by `m` + c: IndexProjection[] + + (*// Potential schema to manage Pending Events together with compaction events based on each one + // This scheme is more complete than the simple `c` encoding, which relies on every writer being able to write all salient snapshots + // For instance, in the case of blue/green deploys, older versions need to be able to coexist without destroying the perf for eachother + "x": [ { "i":0, "c":"ISO 8601" "e":[ @@ -95,8 +101,22 @@ module Store = ] } ] *) - x: JObject[][] } + //x: JObject[][] + } + static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent = + { p = pos.streamName; id = "-1"; m = pos.IndexRel eventCount + c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] } + and IndexProjection = + { /// The Event Type, used to drive deserialization + t: string // required + + /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb + [)>] + d: byte[] // required + /// Optional metadata (null, or same as d, not written if missing) + [); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>] + m: byte[] } // optional (* Pseudocode: function sync(p, expectedVersion, windowSize, events) { if (i == 0) then { @@ -155,19 +175,23 @@ type EqxSyncResult = Written of Store.Position * requestCharge: float | Conflict module private Write = let [] sprocName = "AtomicMultiDocInsert" - let append (client: IDocumentClient) (pos: Store.Position) (eventsData: Store.EventData seq): Async = async { + let append (client: IDocumentClient) (pos: Store.Position) (eventsData: Store.EventData seq,maybeIndexEvents): Async = async { let sprocUri = sprintf "%O/sprocs/%s" pos.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(pos.streamName)) let! ct = Async.CancellationToken let events = eventsData |> Seq.mapi (fun i ed -> Store.Event.Create pos (i+1) ed |> JsonConvert.SerializeObject) |> Seq.toArray if events.Length = 0 then invalidArg "eventsData" "must be non-empty" - let! res = client.ExecuteStoredProcedureAsync(sprocUri, opts, ct, box events) |> Async.AwaitTaskCorrect + let index : Store.IndexEvent = + match maybeIndexEvents with + | None | Some [||] -> Unchecked.defaultof<_> + | Some eds -> Store.IndexEvent.Create pos (events.Length) eds + let! res = client.ExecuteStoredProcedureAsync(sprocUri, opts, ct, box events, box index) |> Async.AwaitTaskCorrect return { pos with index = Some (pos.IndexRel events.Length) }, res.RequestCharge } /// Yields `EqxSyncResult.Written`, or `EqxSyncResult.Conflict` to signify WrongExpectedVersion - let private writeEventsAsync (log : ILogger) client pk (events : Store.EventData[]): Async = async { + let private writeEventsAsync (log : ILogger) client pk (events : Store.EventData[],maybeIndexEvents): Async = async { try - let! wr = append client pk events + let! wr = append client pk (events,maybeIndexEvents) return EqxSyncResult.Written wr with :? DocumentClientException as ex when ex.Message.Contains "already" -> // TODO this does not work for the SP log.Information(ex, "Eqx TrySync WrongExpectedVersionException writing {EventTypes}", [| for x in events -> x.eventType |]) @@ -177,12 +201,12 @@ module private Write = let eventDataLen ({ data = Log.BlobLen bytes; metadata = Log.BlobLen metaBytes } : Store.EventData) = bytes + metaBytes events |> Array.sumBy eventDataLen - let private writeEventsLogged client (pos : Store.Position) (events : Store.EventData[]) (log : ILogger): Async = async { + let private writeEventsLogged client (pos : Store.Position) (events : Store.EventData[], maybeIndexEvents) (log : ILogger): Async = async { let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEventData "Json" events let bytes, count = bytes events, events.Length let log = log |> Log.prop "bytes" bytes let writeLog = log |> Log.prop "stream" pos.streamName |> Log.prop "expectedVersion" pos.Index |> Log.prop "count" count - let! t, result = writeEventsAsync writeLog client pos events |> Stopwatch.Time + let! t, result = writeEventsAsync writeLog client pos (events,maybeIndexEvents) |> Stopwatch.Time let (ru: float), resultLog = let mkMetric ru : Log.Measurement = { stream = pos.streamName; interval = t; bytes = bytes; count = count; ru = ru } match result with @@ -191,8 +215,8 @@ module private Write = resultLog.Information("Eqx {action:l} {count} {ms}ms rc={ru}", "Write", events.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) return result } - let writeEvents (log : ILogger) retryPolicy client pk (events : Store.EventData[]): Async = - let call = writeEventsLogged client pk events + let writeEvents (log : ILogger) retryPolicy client pk (events : Store.EventData[],maybeIndexEvents): Async = + let call = writeEventsLogged client pk (events,maybeIndexEvents) Log.withLoggedRetries retryPolicy "writeAttempt" call log module private Read = @@ -397,8 +421,8 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = match events |> Array.tryFindBack isCompactionEvent with | None -> return Token.ofPreviousTokenAndEventsLength token events.Length batching.BatchSize pos, events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize pos, events } - member __.TrySync log (Pos pos as token) (encodedEvents: Store.EventData[]) isCompactionEventType: Async = async { - let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.Client pos encodedEvents + member __.TrySync log (Pos pos as token) (encodedEvents: Store.EventData[],maybeIndexEvents) isCompactionEventType: Async = async { + let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.Client pos (encodedEvents,maybeIndexEvents) match wr with | EqxSyncResult.Conflict _ -> return GatewaySyncResult.Conflict | EqxSyncResult.Written (wr, _) -> @@ -419,10 +443,16 @@ type private Collection(gateway : EqxGateway, databaseId, collectionId) = member __.Gateway = gateway member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId) +[] +type SearchStrategy<'event> = + | EventType of string + | Predicate of ('event -> bool) + [] type AccessStrategy<'event,'state> = | EventsAreState | RollingSnapshots of eventType: string * compact: ('state -> 'event) + | IndexedSearch of predicate: ('event -> bool) * index: ('state -> 'event seq) type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = /// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes) @@ -432,13 +462,20 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni let (|Pos|) streamName : Store.Position = { collectionUri = coll.CollectionUri; streamName = streamName; index = None } let compactionPredicate = match access with + | Some (AccessStrategy.IndexedSearch _) | None -> None | Some AccessStrategy.EventsAreState -> Some (fun _ -> true) | Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et) + let searchPredicate = + match access with + | None -> None + | Some AccessStrategy.EventsAreState -> Some (SearchStrategy.Predicate (fun _ -> true)) + | Some (AccessStrategy.IndexedSearch (ep,_)) -> Some (SearchStrategy.Predicate ep) let loadAlgorithm load (Pos pos) initial log = let batched = load initial (coll.Gateway.LoadBatched log None pos) let compacted predicate = load initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate pos) match access with + | Some (AccessStrategy.IndexedSearch _) | None -> batched | Some AccessStrategy.EventsAreState -> compacted (fun _ -> true) | Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et) @@ -452,14 +489,18 @@ type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUni member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) (token : Storage.StreamToken, state : 'state) (events : 'event list, state' : 'state) : Async> = async { - let events = + let events, index = match access with - | None | Some AccessStrategy.EventsAreState -> events + | None | Some AccessStrategy.EventsAreState -> + events, None | Some (AccessStrategy.RollingSnapshots (_,f)) -> let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) - if cc.IsCompactionDue then events @ [f state'] else events + (if cc.IsCompactionDue then events @ [f state'] else events), None + | Some (AccessStrategy.IndexedSearch (_,index)) -> + events, Some (index state') let encodedEvents : Store.EventData[] = UnionEncoderAdapters.encodeEvents codec (Seq.ofList events) - let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionPredicate + let maybeIndexEvents : Store.EventData[] option = index |> Option.map (UnionEncoderAdapters.encodeEvents codec) + let! syncRes = coll.Gateway.TrySync log token (encodedEvents,maybeIndexEvents) compactionPredicate match syncRes with | GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionPredicate true)) | GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) } @@ -590,7 +631,8 @@ module Initialization = return coll.Resource.Id } let createProc (client: IDocumentClient) (collectionUri: Uri) = async { - let f ="""function multidocInsert(docs) { + let f ="""function multidocInsert(docs,index) { + // TODO insert or update the index, verifying expectedVersion var response = getContext().getResponse(); var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink();