Skip to content

Commit

Permalink
feat(Cosmos): compressUnfolds -> shouldCompress
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 31, 2023
1 parent 21d18dc commit 1e5c393
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `CosmosStore.CosmosStoreCategory`: Generalize `compressUnfolds` to `shouldCompress` predicate [#436](https://github.com/jet/equinox/pull/436)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)
- `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
Expand Down
22 changes: 11 additions & 11 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ module internal Sync =
correlationId = e.CorrelationId; causationId = e.CausationId }
let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
e = Array.map mkEvent events; u = Array.ofSeq unfolds }
let mkUnfold compressor baseIndex (x: IEventData<_>): Unfold =
e = Array.map mkEvent events; u = unfolds }
let mkUnfold baseIndex (compressor, x: IEventData<_>): Unfold =
{ i = baseIndex; t = x.Timestamp
c = x.EventType; d = compressor x.Data; m = compressor x.Meta }

Expand Down Expand Up @@ -1064,7 +1064,8 @@ type StoreClient(container: Container, fallback: Container option, query: QueryO
type internal StoreCategory<'event, 'state, 'req>
( store: StoreClient, createStoredProcIfNotExistsExactlyOnce: CancellationToken -> System.Threading.Tasks.ValueTask,
codec: IEventCodec<'event, EventBody, 'req>, fold, initial: 'state, isOrigin: 'event -> bool,
checkUnfolds, compressUnfolds, mapUnfolds: Choice<unit, 'event[] -> 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) =
checkUnfolds, shouldCompress, mapUnfolds: Choice<unit, 'event[] -> 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) =
let shouldCompress = defaultArg shouldCompress (Func<'event, bool>(fun _ -> true))
let fold s xs = (fold : Func<'state, 'event[], 'state>).Invoke(s, xs)
let reload (log, streamName, (Token.Unpack pos as streamToken), state) preloaded ct: Task<struct (StreamToken * 'state)> = task {
match! store.Reload(log, (streamName, pos), (codec.Decode, isOrigin), ct, ?preview = preloaded) with
Expand All @@ -1079,14 +1080,14 @@ type internal StoreCategory<'event, 'state, 'req>
let state' = fold state events
let exp, events, eventsEncoded, projectionsEncoded =
let encode e = codec.Encode(req, e)
let encodeU e = (if shouldCompress.Invoke e then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull), encode e
match mapUnfolds with
| Choice1Of3 () -> SyncExp.Version pos.index, events, Array.map encode events, Seq.empty
| Choice2Of3 unfold -> SyncExp.Version pos.index, events, Array.map encode events, Array.map encode (unfold events state')
| Choice2Of3 unfold -> SyncExp.Version pos.index, events, Array.map encode events, Seq.map encodeU (unfold events state')
| Choice3Of3 transmute -> let events', unfolds = transmute events state'
SyncExp.Etag (defaultArg pos.etag null), events', Array.map encode events', Seq.map encode unfolds
SyncExp.Etag (defaultArg pos.etag null), events', Array.map encode events', Seq.map encodeU unfolds
let baseIndex = pos.index + int64 (Array.length events)
let renderElement = if compressUnfolds then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull
let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold renderElement baseIndex)
let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold baseIndex) |> Array.ofSeq
let batch = Sync.mkBatch streamName eventsEncoded projections
do! createStoredProcIfNotExistsExactlyOnce ct
match! store.Sync(log, streamName, exp, batch, ct) with
Expand Down Expand Up @@ -1315,9 +1316,8 @@ type CosmosStoreCategory<'event, 'state, 'req> =
// NOTE Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
// NOTE re SlidingWindowPrefixed: the recommended approach is to track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip
caching,
// Compress Unfolds in Tip. Default: <c>true</c>. NOTE reading uncompressed values requires Version >= 2.3.0
[<O; D null>] ?compressUnfolds) =
let compressUnfolds = defaultArg compressUnfolds true
// Compress Unfolds in Tip. Default: Compress all Unfolds. NOTE reading uncompressed values requires Version >= 2.3.0
[<O; D null>] ?shouldCompress) =
let isOrigin, checkUnfolds, mapUnfolds =
match access with
| AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 ()
Expand All @@ -1327,7 +1327,7 @@ type CosmosStoreCategory<'event, 'state, 'req> =
| AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton)
| AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute
{ inherit Equinox.Category<'event, 'state, 'req>(name,
StoreCategory<'event, 'state, 'req>(context.StoreClient, context.EnsureStoredProcedureInitialized, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds)
StoreCategory<'event, 'state, 'req>(context.StoreClient, context.EnsureStoredProcedureInitialized, codec, fold, initial, isOrigin, checkUnfolds, shouldCompress, mapUnfolds)
|> Caching.apply Token.isStale caching) }

module Exceptions =
Expand Down

0 comments on commit 1e5c393

Please sign in to comment.