Skip to content

Commit

Permalink
Compress snapshots in Index
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 26, 2018
1 parent 5117433 commit 56ba4b9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 19 deletions.
4 changes: 2 additions & 2 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,15 @@ let main argv =
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
let rus = args.GetResult(Rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus}", rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus)
Equinox.Cosmos.Initialization.initialize conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let res = runTest log conn targs
let read, write = RuCounterSink.Read, RuCounterSink.Write
let total = read+write
log.Information("Total RUs consumed: {totalRus} (R:{readRus}, W:{writeRus})", total, read, write)
log.Information("Total Request Charges sustained in test: {totalRus:n0} (R:{readRus:n0}, W:{writeRus:n0})", total, read, write)
res
| _ -> failwith "init or run is required"
| _ -> failwith "ERROR: please specify memory, es or cosmos Store"
Expand Down
44 changes: 39 additions & 5 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ open Newtonsoft.Json.Linq
open Serilog
open System


[<AutoOpen>]
module DocDbExtensions =
module private DocDbExtensions =
/// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions
let (|AggregateException|) (exn : exn) =
let rec aux (e : exn) =
Expand Down Expand Up @@ -53,6 +52,8 @@ module DocDbExtensions =
| DocDbException (DocDbStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified }

module Store =
open System.IO
open System.IO.Compression
[<NoComparison>]
type Position =
{ collectionUri: Uri; streamName: string; index: int64 option; etag: string option }
Expand Down Expand Up @@ -165,21 +166,54 @@ module Store =
static member Create (pos: Position) eventCount (eds: EventData[]) : IndexEvent =
{ p = pos.streamName; id = IndexEvent.IdConstant; m = pos.IndexRel eventCount; _etag = null
c = [| for ed in eds -> { t = ed.eventType; d = ed.data; m = ed.metadata } |] }
and IndexProjection =
and [<CLIMutable>] IndexProjection =
{ /// The Event Type, used to drive deserialization
t: string // required

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>)>]
[<JsonConverter(typeof<Base64ZipUtf8JsonConverter>)>]
d: byte[] // required

/// Optional metadata (null, or same as d, not written if missing)
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
[<JsonConverter(typeof<Base64ZipUtf8JsonConverter>); JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] } // optional
interface IEventData with
member __.EventType = __.t
member __.DataUtf8 = __.d
member __.MetaUtf8 = __.m

/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
/// Only applied to snapshots in the Index
and Base64ZipUtf8JsonConverter() =
inherit JsonConverter()
let pickle (input : byte[]) : string =
if input = null then null else

use output = new MemoryStream()
use compressor = new DeflateStream(output, CompressionLevel.Optimal)
compressor.Write(input,0,input.Length)
compressor.Close()
Convert.ToBase64String(output.ToArray())
let unpickle str : byte[] =
if str = null then null else

let compressedBytes = Convert.FromBase64String str
use input = new MemoryStream(compressedBytes)
use decompressor = new DeflateStream(input, CompressionMode.Decompress)
use output = new MemoryStream()
decompressor.CopyTo(output)
decompressor.Close()
output.ToArray()

override __.CanConvert(objectType) =
typeof<byte[]>.Equals(objectType)
override __.ReadJson(reader, _, _, serializer) =
//( if reader.TokenType = JsonToken.Null then null else
serializer.Deserialize(reader, typedefof<string>) :?> string |> unpickle |> box
override __.WriteJson(writer, value, serializer) =
let pickled = value |> unbox |> pickle
serializer.Serialize(writer, pickled)

(* Pseudocode:
function sync(p, expectedVersion, windowSize, events) {
if (i == 0) then {
Expand Down
52 changes: 40 additions & 12 deletions tests/Equinox.Cosmos.Integration/VerbatimUtf8JsonConverterTests.fs
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
module Equinox.Cosmos.Integration.VerbatimUtf8JsonConverterTests

open Equinox.Cosmos
open FsCheck.Xunit
open Newtonsoft.Json
open Swensen.Unquote
open System
open Xunit

let inline serialize (x:'t) =
let serializer = new JsonSerializer()
use sw = new System.IO.StringWriter()
use w = new JsonTextWriter(sw)
serializer.Serialize(w,x)
sw.ToString()

type Embedded = { embed : string }
type Union =
| A of Embedded
| B of Embedded
interface TypeShape.UnionContract.IUnionContract

let mkUnionEncoder () = Equinox.UnionCodec.JsonUtf8.Create<Union>(JsonSerializerSettings())

[<Fact>]
let ``VerbatimUtf8JsonConverter serializes properly`` () =
let unionEncoder = Equinox.UnionCodec.JsonUtf8.Create<_>(JsonSerializerSettings())
let encoded = unionEncoder.Encode(A { embed = "\"" })
let ``VerbatimUtf8JsonConverter encodes correctly`` () =
let encoded = mkUnionEncoder().Encode(A { embed = "\"" })
let e : Store.Event =
{ p = "streamName"; id = string 0; i = 0L
c = DateTimeOffset.MinValue
t = encoded.caseName
d = encoded.payload
m = null }
let res = serialize e
test <@ res.Contains """"d":{"embed":"\""}""" @>
let res = JsonConvert.SerializeObject(e)
test <@ res.Contains """"d":{"embed":"\""}""" @>

type Base64ZipUtf8JsonConverterTests() =
let unionEncoder = mkUnionEncoder ()

[<Fact>]
let ``serializes, achieving compression`` () =
let encoded = unionEncoder.Encode(A { embed = String('x',5000) })
let e : Store.IndexProjection =
{ t = encoded.caseName
d = encoded.payload
m = null }
let res = JsonConvert.SerializeObject e
test <@ res.Contains("\"d\":\"") && res.Length < 100 @>

[<Property>]
let roundtrips value =
let hasNulls =
match value with
| A x | B x when obj.ReferenceEquals(null, x) -> true
| A { embed = x } | B { embed = x } -> obj.ReferenceEquals(null, x)
if hasNulls then () else

let encoded = unionEncoder.Encode value
let e : Store.IndexProjection =
{ t = encoded.caseName
d = encoded.payload
m = null }
let ser = JsonConvert.SerializeObject(e)
test <@ ser.Contains("\"d\":\"") @>
let des = JsonConvert.DeserializeObject<Store.IndexProjection>(ser)
let d : Equinox.UnionCodec.EncodedUnion<_> = { caseName = des.t; payload=des.d }
let decoded = unionEncoder.Decode d
test <@ value = decoded @>

0 comments on commit 56ba4b9

Please sign in to comment.