diff --git a/CHANGELOG.md b/CHANGELOG.md index d244b9ede..58136fb9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) - `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305) - `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338) +- `CosmosStore.CosmosStoreCategory`: Generalize `compressUnfolds` to `shouldCompress` predicate [#436](https://github.com/jet/equinox/pull/436) - `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish) - `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430) - `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 197998d1c..740bc695f 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -505,8 +505,8 @@ module internal Sync = correlationId = e.CorrelationId; causationId = e.CausationId } let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip = { p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null - e = Array.map mkEvent events; u = Array.ofSeq unfolds } - let mkUnfold compressor baseIndex (x: IEventData<_>): Unfold = + e = Array.map mkEvent events; u = unfolds } + let mkUnfold baseIndex (compressor, x: IEventData<_>): Unfold = { i = baseIndex; t = x.Timestamp c = x.EventType; d = compressor x.Data; m = compressor x.Meta } @@ -1064,7 +1064,8 @@ type StoreClient(container: Container, fallback: Container option, query: QueryO type internal StoreCategory<'event, 'state, 'req> ( store: StoreClient, createStoredProcIfNotExistsExactlyOnce: CancellationToken -> System.Threading.Tasks.ValueTask, codec: IEventCodec<'event, EventBody, 'req>, fold, initial: 'state, isOrigin: 'event -> bool, - checkUnfolds, compressUnfolds, mapUnfolds: Choice 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) = + checkUnfolds, shouldCompress, mapUnfolds: Choice 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) = + let shouldCompress = defaultArg shouldCompress (Func<'event, bool>(fun _ -> true)) let fold s xs = (fold : Func<'state, 'event[], 'state>).Invoke(s, xs) let reload (log, streamName, (Token.Unpack pos as streamToken), state) preloaded ct: Task = task { match! store.Reload(log, (streamName, pos), (codec.Decode, isOrigin), ct, ?preview = preloaded) with @@ -1079,14 +1080,14 @@ type internal StoreCategory<'event, 'state, 'req> let state' = fold state events let exp, events, eventsEncoded, projectionsEncoded = let encode e = codec.Encode(req, e) + let encodeU e = (if shouldCompress.Invoke e then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull), encode e match mapUnfolds with | Choice1Of3 () -> SyncExp.Version pos.index, events, Array.map encode events, Seq.empty - | Choice2Of3 unfold -> SyncExp.Version pos.index, events, Array.map encode events, Array.map encode (unfold events state') + | Choice2Of3 unfold -> SyncExp.Version pos.index, events, Array.map encode events, Seq.map encodeU (unfold events state') | Choice3Of3 transmute -> let events', unfolds = transmute events state' - SyncExp.Etag (defaultArg pos.etag null), events', Array.map encode events', Seq.map encode unfolds + SyncExp.Etag (defaultArg pos.etag null), events', Array.map encode events', Seq.map encodeU unfolds let baseIndex = pos.index + int64 (Array.length events) - let renderElement = if compressUnfolds then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull - let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold renderElement baseIndex) + let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold baseIndex) |> Array.ofSeq let batch = Sync.mkBatch streamName eventsEncoded projections do! createStoredProcIfNotExistsExactlyOnce ct match! store.Sync(log, streamName, exp, batch, ct) with @@ -1315,9 +1316,8 @@ type CosmosStoreCategory<'event, 'state, 'req> = // NOTE Unless LoadOption.AnyCachedValue or AllowStale are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). // NOTE re SlidingWindowPrefixed: the recommended approach is to track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip caching, - // Compress Unfolds in Tip. Default: true. NOTE reading uncompressed values requires Version >= 2.3.0 - [] ?compressUnfolds) = - let compressUnfolds = defaultArg compressUnfolds true + // Compress Unfolds in Tip. Default: Compress all Unfolds. NOTE reading uncompressed values requires Version >= 2.3.0 + [] ?shouldCompress) = let isOrigin, checkUnfolds, mapUnfolds = match access with | AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 () @@ -1327,7 +1327,7 @@ type CosmosStoreCategory<'event, 'state, 'req> = | AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton) | AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute { inherit Equinox.Category<'event, 'state, 'req>(name, - StoreCategory<'event, 'state, 'req>(context.StoreClient, context.EnsureStoredProcedureInitialized, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds) + StoreCategory<'event, 'state, 'req>(context.StoreClient, context.EnsureStoredProcedureInitialized, codec, fold, initial, isOrigin, checkUnfolds, shouldCompress, mapUnfolds) |> Caching.apply Token.isStale caching) } module Exceptions =