-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathDynamoStore.fs
1551 lines (1398 loc) · 99.7 KB
/
DynamoStore.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
namespace Equinox.DynamoStore.Core
open Equinox.Core
open FsCodec
open FSharp.AWS.DynamoDB
open FSharp.Control
open Serilog
open System
open System.Collections.Generic
open System.IO
open System.Threading
open System.Threading.Tasks
[<Struct; NoEquality; NoComparison>]
type InternalBody = { encoding : int; data : MemoryStream }
module private InternalBody =
let ofStreamAndEncoding (stream : MemoryStream option, encoding : int option) : InternalBody =
let stream = Option.toObj stream
{ encoding = defaultArg encoding 0; data = stream }
let toStreamAndEncoding (encoded : InternalBody) =
Option.ofObj encoded.data, match encoded.encoding with 0 -> None | x -> Some x
let bytes (x : InternalBody) =
if x.data = null then 0
else int x.data.Length
/// A single Domain Event from the array held in a Batch
[<NoEquality; NoComparison>]
type Event =
{ /// Index number within stream, not persisted (computed from Batch's `n` and the index within `e`)
i : int
/// Creation Timestamp, as set by the application layer at the point of rendering the Event
t : DateTimeOffset
/// The Event Type (Case) that defines the content of the Data (and Metadata) fields
c : string
/// Main event body; required
d : InternalBody
/// Optional metadata, encoded as per 'd'; can be Empty
m : InternalBody
/// CorrelationId; stored as x (signifying transactionId), or null
correlationId : string option
/// CausationId; stored as y (signifying why), or null
causationId : string option }
interface ITimelineEvent<InternalBody> with
member x.Index = x.i
member x.IsUnfold = false
member x.Context = null
member x.Size = Event.Bytes x
member x.EventType = x.c
member x.Data = x.d
member x.Meta = x.m
member _.EventId = Guid.Empty
member x.CorrelationId = Option.toObj x.correlationId
member x.CausationId = Option.toObj x.causationId
member x.Timestamp = x.t
static member Bytes(x : Event) =
let inline len x = match x with Some (s : string) -> s.Length | None -> 0
x.c.Length + InternalBody.bytes x.d + InternalBody.bytes x.m + len x.correlationId + len x.causationId + 20 (*t*) + 20 (*overhead*)
module Event =
let arrayBytes (xs : Event array) = Array.sumBy Event.Bytes xs
/// Compaction/Snapshot/Projection Event based on the state at a given point in time `i`
[<NoEquality; NoComparison>]
type Unfold =
{ /// Base: Stream Position (Version) of State from which this Unfold was generated
i : int64
/// Generation datetime
t : DateTimeOffset
/// The Case (Event Type) of this snapshot, used to drive deserialization
c : string // required
/// Event body
d : InternalBody // required
/// Optional metadata, can be Empty
m : InternalBody }
interface ITimelineEvent<InternalBody> with
member x.Index = x.i
member x.IsUnfold = true
member x.Context = null
member x.Size = Unfold.Bytes x
member x.EventType = x.c
member x.Data = x.d
member x.Meta = x.m
member _.EventId = Guid.Empty
member x.CorrelationId = null
member x.CausationId = null
member x.Timestamp = x.t
static member Bytes(x : Unfold) = x.c.Length + InternalBody.bytes x.d + InternalBody.bytes x.m + 50
module Unfold =
let arrayBytes (xs : Unfold array) = match xs with null -> 0 | u -> Array.sumBy Unfold.Bytes u
/// The abstract storage format for a Batch of Events represented in a DynamoDB Item
/// NOTE See Batch.Schema buddy type for what actually gets stored
/// NOTE names are intended to generally align with CosmosStore naming. Key Diffs:
/// - no mandatory `id` and/or requirement for it to be a `string` -> replaced with `i` as an int64
/// (also Tip magic value is tipMagicI: Int32.MaxValue, not "-1")
/// - etag is managed explicitly (on Cosmos DB, its managed by the service and named "_etag")
[<NoEquality; NoComparison>]
type Batch =
{ p : string // "{streamName}"
/// (Tip Batch only) Number of bytes held in predecessor Batches
b : int option
/// base 'i' value for the Events held herein
i : int64 // tipMagicI for the Tip
/// Marker on which compare-and-swap operations on Tip are predicated
etag : string
/// `i` value for successor batch (to facilitate identifying which Batch a given startPos is within)
n : int64
/// The Domain Events (as opposed to Unfolded Events in `u`) for this page of the stream
e : Event array
/// Compaction/Snapshot/Projection quasi-events
u : Unfold array }
module Batch =
/// NOTE QueryIAndNOrderByNAscending and others rely on this, when used as the [<RangeKey>], sorting after the other items
let tipMagicI = int64 Int32.MaxValue
let tableKeyForStreamTip stream = TableKey.Combined(stream, tipMagicI)
let isTip i = i = tipMagicI
/// The concrete storage format
[<NoEquality; NoComparison>]
type Schema =
{ [<HashKey>]
p : string
[<RangeKey>]
i : int64 // tipMagicI for the Tip
b : int option // iff Tip: bytes in predecessor batches
etag : string option
n : int64
// Count of items written in the most recent insert/update - used by the DDB Streams Consumer to identify the fresh events
a : int
// NOTE the per-event e.c values are actually stored here, so they can be selected out without hydrating the bodies
c : string array
// NOTE as per Event, but without c and t fields; we instead unroll those as arrays at top level
e : EventSchema array
u : UnfoldSchema array }
and [<NoEquality; NoComparison>] EventSchema =
{ t : DateTimeOffset // NOTE there has to be a single non-`option` field per record, or a trailing insert will be stripped
d : MemoryStream option; D : int option // D carries encoding, None -> 0 // required
m : MemoryStream option; M : int option // M carries encoding, None -> 0
x : string option
y : string option }
and [<NoEquality; NoComparison>] UnfoldSchema =
{ i : int64
t : DateTimeOffset
c : string // required
d : MemoryStream option; D : int option // D carries encoding, None -> 0 // required
m : MemoryStream option; M : int option } // M carries encoding, None -> 0
let private toEventSchema (x : Event) : EventSchema =
let (d, D), (m, M) = InternalBody.toStreamAndEncoding x.d, InternalBody.toStreamAndEncoding x.m
{ t = x.t; d = d; D = D; m = m; M = M; x = x.correlationId; y = x.causationId }
let eventsToSchema (xs : Event array) : (*case*) string array * EventSchema array =
xs |> Array.map (fun x -> x.c), xs |> Array.map toEventSchema
let private toUnfoldSchema (x : Unfold) : UnfoldSchema =
let (d, D), (m, M) = InternalBody.toStreamAndEncoding x.d, InternalBody.toStreamAndEncoding x.m
{ i = x.i; t = x.t; c = x.c; d = d; D = D; m = m; M = M }
let unfoldsToSchema = Array.map toUnfoldSchema
let private ofUnfoldSchema (x : UnfoldSchema) : Unfold =
{ i = x.i; t = x.t; c = x.c; d = InternalBody.ofStreamAndEncoding (x.d, x.D); m = InternalBody.ofStreamAndEncoding (x.m, x.M) }
let ofSchema (x : Schema) : Batch =
let baseIndex = int x.n - x.e.Length
let events =
Seq.zip x.c x.e
|> Seq.mapi (fun i (c, e) ->
let data, meta = InternalBody.ofStreamAndEncoding (e.d, e.D), InternalBody.ofStreamAndEncoding (e.m, e.M)
{ i = baseIndex + i; t = e.t; d = data; m = meta; correlationId = e.x; causationId = e.y; c = c })
{ p = x.p; b = x.b; i = x.i; etag = Option.toObj x.etag; n = x.n; e = Seq.toArray events; u = x.u |> Array.map ofUnfoldSchema }
let enumEvents (minIndex, maxIndex) (x : Batch) : Event seq =
let indexMin, indexMax = defaultArg minIndex 0L, defaultArg maxIndex Int64.MaxValue
// If we're loading from a nominated position, we need to discard items in the batch before/after the start on the start page
x.e |> Seq.filter (fun e -> let i = int64 e.i in i >= indexMin && int64 i < indexMax)
/// Computes base Index for the Item (`i` can bear the the magic value TipI when the Item is the Tip)
let baseIndex (x : Batch) = x.n - x.e.LongLength
let bytesUnfolds (x : Batch) = Unfold.arrayBytes x.u
let bytesBase (x : Batch) = 80 + x.p.Length + String.length x.etag + Event.arrayBytes x.e
let bytesTotal (xs : Batch seq) = xs |> Seq.sumBy (fun x -> bytesBase x + bytesUnfolds x)
type EncodedBody = (struct (int * ReadOnlyMemory<byte>))
module EncodedBody =
let private decodeBody (raw : InternalBody) : EncodedBody =
raw.encoding, if raw.data = null then ReadOnlyMemory.Empty else raw.data.ToArray() |> ReadOnlyMemory
let internal ofInternal = Core.TimelineEvent.Map decodeBody
let internal toInternal struct (encoding, encodedBody : ReadOnlyMemory<byte>) : InternalBody =
{ encoding = encoding
data = match encodedBody with d when d.IsEmpty -> null | d -> new MemoryStream(d.ToArray(), writable = false) }
// We only capture the total RUs, without attempting to split them into read/write as the service does not actually populate the associated fields
// See https://github.com/aws/aws-sdk-go/issues/2699#issuecomment-514378464
[<Struct>]
type RequestConsumption = { total : float }
[<Struct; RequireQualifiedAccess>]
type Direction = Forward | Backward override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"
module Log =
/// <summary>Name of Property used for <c>Metric</c> in <c>LogEvent</c>s.</summary>
let [<Literal>] PropertyTag = "ddbEvt"
[<NoEquality; NoComparison>]
type Measurement =
{ table : string; stream : string
interval : StopwatchInterval; bytes : int; count : int; ru : float }
member x.Category = StreamName.category (FSharp.UMX.UMX.tag x.stream)
let inline metric table stream t bytes count rc : Measurement =
{ table = table; stream = stream; interval = t; bytes = bytes; count = count; ru = rc.total }
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Metric =
/// Individual read request for the Tip
| Tip of Measurement
/// Individual read request for the Tip, not found
| TipNotFound of Measurement
/// Tip read but etag unchanged, signifying payload not inspected as it can be trusted not to have been altered
/// (NOTE the read is still fully charged on Dynamo, as opposed to Cosmos where it is 1 RU regardless of size)
| TipNotModified of Measurement
/// Summarizes a set of Responses for a given Read request
| Query of Direction * responses : int * Measurement
/// Individual read request in a Batch
/// Charges are rolled up into Query Metric (so do not double count)
| QueryResponse of Direction * Measurement
| SyncAppend of Measurement
| SyncCalve of Measurement
| SyncAppendConflict of Measurement
| SyncCalveConflict of Measurement
/// Summarizes outcome of request to trim batches from head of a stream and events in Tip
/// count in Measurement is number of batches (documents) deleted
/// bytes in Measurement is number of events deleted
| Prune of responsesHandled : int * Measurement
/// Handled response from listing of batches in a stream
/// Charges are rolled up into the Prune Metric (so do not double count)
| PruneResponse of Measurement
/// Deleted an individual Batch
| Delete of Measurement
/// Trimmed the Tip
| Trim of Measurement
let [<return: Struct>] (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric voption =
let mutable p = Unchecked.defaultof<_>
logEvent.Properties.TryGetValue(PropertyTag, &p) |> ignore
match p with Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone
/// Attach a property to the captured event record to hold the metric information
let internal event (value : Metric) = Internal.Log.withScalarProperty PropertyTag value
let internal prop name value (log : ILogger) = log.ForContext(name, value)
[<RequireQualifiedAccess>]
type Operation = Tip | Tip404 | Tip304 | Query | Append | Calve | AppendConflict | CalveConflict | Prune | Delete | Trim
let (|Op|QueryRes|PruneRes|) = function
| Metric.Tip s -> Op (Operation.Tip, s)
| Metric.TipNotFound s -> Op (Operation.Tip404, s)
| Metric.TipNotModified s -> Op (Operation.Tip304, s)
| Metric.Query (_, _, s) -> Op (Operation.Query, s)
| Metric.QueryResponse (direction, s) -> QueryRes (direction, s)
| Metric.SyncAppend s -> Op (Operation.Append, s)
| Metric.SyncCalve s -> Op (Operation.Calve, s)
| Metric.SyncAppendConflict s -> Op (Operation.AppendConflict, s)
| Metric.SyncCalveConflict s -> Op (Operation.CalveConflict, s)
| Metric.Prune (_, s) -> Op (Operation.Prune, s)
| Metric.PruneResponse s -> PruneRes s
| Metric.Delete s -> Op (Operation.Delete, s)
| Metric.Trim s -> Op (Operation.Trim, s)
module InternalMetrics =
module Stats =
type internal Counter =
{ mutable rux100 : int64; mutable count : int64; mutable ms : int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member x.Ingest(ms, ru) =
Interlocked.Increment(&x.count) |> ignore
Interlocked.Add(&x.rux100, int64 (ru * 100.)) |> ignore
Interlocked.Add(&x.ms, ms) |> ignore
type internal Counters() =
let tables = System.Collections.Concurrent.ConcurrentDictionary<string, Counter>()
let create (_name : string) = Counter.Create()
member _.Ingest(table, ms, ru) = tables.GetOrAdd(table, create).Ingest(ms, ru)
member _.Tables = tables.Keys
member _.TryTable table = match tables.TryGetValue table with true, t -> Some t | false, _ -> None
type Epoch() =
let epoch = System.Diagnostics.Stopwatch.StartNew()
member val internal Tip = Counters() with get, set
member val internal Query = Counters() with get, set
member val internal Append = Counters() with get, set
member val internal Calve = Counters() with get, set
member val internal Append409 = Counters() with get, set
member val internal Calve409 = Counters() with get, set
member val internal Prune = Counters() with get, set
member val internal Delete = Counters() with get, set
member val internal Trim = Counters() with get, set
member _.Stop() = epoch.Stop()
member _.Elapsed = epoch.Elapsed
let inline private (|TableMsRu|) ({ table = t; interval = i; ru = ru } : Measurement) =
t, int64 i.ElapsedMilliseconds, ru
type LogSink() =
static let mutable epoch = Epoch()
static member Restart() =
let fresh = Epoch()
let outgoing = Interlocked.Exchange(&epoch, fresh)
outgoing.Stop()
outgoing
interface Serilog.Core.ILogEventSink with
member _.Emit logEvent =
match logEvent with
| MetricEvent cm ->
match cm with
| Op ((Operation.Tip | Operation.Tip404 | Operation.Tip304), TableMsRu m) ->
epoch.Tip.Ingest m
| Op (Operation.Query, TableMsRu m) -> epoch.Query.Ingest m
| QueryRes (_direction, _) -> ()
| Op (Operation.Append, TableMsRu m) -> epoch.Append.Ingest m
| Op (Operation.Calve, TableMsRu m) -> epoch.Calve.Ingest m
| Op (Operation.AppendConflict, TableMsRu m) -> epoch.Append409.Ingest m
| Op (Operation.CalveConflict, TableMsRu m) -> epoch.Calve409.Ingest m
| Op (Operation.Prune, TableMsRu m) -> epoch.Prune.Ingest m
| PruneRes _ -> ()
| Op (Operation.Delete, TableMsRu m) -> epoch.Delete.Ingest m
| Op (Operation.Trim, TableMsRu m) -> epoch.Trim.Ingest m
| _ -> ()
/// Relies on feeding of metrics from Log through to Stats.LogSink
/// Use Stats.LogSink.Restart() to reset the start point (and stats) where relevant
let dump (log : ILogger) =
let res = Stats.LogSink.Restart()
let stats =
[ nameof res.Tip, res.Tip
nameof res.Query, res.Query
nameof res.Append, res.Append
nameof res.Append409, res.Append409
nameof res.Calve, res.Calve
nameof res.Calve409, res.Calve409
nameof res.Prune, res.Prune
nameof res.Delete, res.Delete
nameof res.Trim, res.Trim ]
for table in stats |> Seq.collect (fun (_n, stat) -> stat.Tables) |> Seq.distinct |> Seq.sort do
let mutable rows, totalCount, totalRRu, totalWRu, totalMs = 0, 0L, 0., 0., 0L
let logActivity name count ru lat =
let aru, ams = (if count = 0L then Double.NaN else ru/float count), (if count = 0L then Double.NaN else float lat/float count)
let rut = name |> function
| "TOTAL" -> "" | nameof res.Tip | nameof res.Query | nameof res.Prune -> totalRRu <- totalRRu + ru; "R"
| _ -> totalWRu <- totalWRu + ru; "W"
log.Information("{table} {name}: {count:n0}r {ru:n0}{rut:l}CU Average {avgRu:n1}CU {lat:n0}ms", table, name, count, ru, rut, aru, ams)
for name, stat in stats do
match stat.TryTable table with
| Some stat when stat.count <> 0L ->
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
rows <- rows + 1
| _ -> ()
if rows > 1 then logActivity "TOTAL" totalCount (totalRRu + totalWRu) totalMs
let measures : (string * (TimeSpan -> float)) list = [ "s", fun x -> x.TotalSeconds(*; "m", fun x -> x.TotalMinutes; "h", fun x -> x.TotalHours*) ]
let logPeriodicRate name count rru wru = log.Information("{table} {rru:n1}R/{wru:n1}W CU @ {count:n0} rp{unit}", table, rru, wru, count, name)
for uom, f in measures do let d = f res.Elapsed in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRRu/d) (totalWRu/d)
module Initialization =
open Amazon.DynamoDBv2
let private prepare (client : IAmazonDynamoDB) tableName maybeThroughput : Async<unit> =
let context = TableContext<Batch.Schema>(client, tableName)
match maybeThroughput with Some throughput -> context.VerifyOrCreateTableAsync(throughput) |> Async.Ignore | None -> context.VerifyTableAsync()
/// Verify the specified <c>tableName</c> is present and adheres to the correct schema.
let verify (client : IAmazonDynamoDB) tableName : Async<unit> =
let context = TableContext<Batch.Schema>(client, tableName)
context.VerifyTableAsync()
[<RequireQualifiedAccess>]
type StreamingMode = Off | New | NewAndOld
let toStreaming = function
| StreamingMode.Off -> Streaming.Disabled
| StreamingMode.New -> Streaming.Enabled StreamViewType.NEW_IMAGE
| StreamingMode.NewAndOld -> Streaming.Enabled StreamViewType.NEW_AND_OLD_IMAGES
/// Create the specified <c>tableName</c> if it does not exist. Will throw if it exists but the schema mismatches.
let createIfNotExists (client : IAmazonDynamoDB) tableName (throughput, streamingMode) : Async<unit> =
let context = TableContext<Batch.Schema>(client, tableName)
context.VerifyOrCreateTableAsync(throughput, toStreaming streamingMode) |> Async.Ignore
/// Provision (or re-provision) the specified table with the specified <c>Throughput</c>. Will throw if schema mismatches.
let provision (client : IAmazonDynamoDB) tableName (throughput, streamingMode) = async {
let context = TableContext<Batch.Schema>(client, tableName)
let! desc = context.VerifyOrCreateTableAsync(throughput, toStreaming streamingMode)
return! context.UpdateTableIfRequiredAsync(throughput, toStreaming streamingMode, currentTableDescription = desc) }
/// Yields result of <c>DescribeTable</c>; Will throw if table does not exist, or creation is in progress
let describe (client : IAmazonDynamoDB) tableName : Async<Model.TableDescription> =
let context = TableContext<Batch.Schema>(client, tableName)
context.UpdateTableIfRequiredAsync()
/// Yields the <c>StreamsARN</c> if (but only if) it streaming is presently active
let tryGetActiveStreamsArn (x : Model.TableDescription) =
match x.StreamSpecification with
| ss when ss <> null && ss.StreamEnabled -> x.LatestStreamArn
| _ -> null
type private Metrics() =
let mutable t = 0.
member _.Add(x : RequestMetrics) =
for x in x.ConsumedCapacity do
t <- t + x.CapacityUnits
member _.Consumed : RequestConsumption = { total = t }
module private Async =
let startImmediateAsTask ct computation = Async.StartImmediateAsTask(computation, cancellationToken = ct)
module private Stopwatch =
let timeAsync (ct : CancellationToken) (f : Async<'T>) : Task<struct (StopwatchInterval * 'T)> =
(fun ct -> Async.startImmediateAsTask ct f) |> Stopwatch.time ct
type internal BatchIndices = { isTip : bool; index : int64; n : int64 }
type Container(tableName, createContext : (RequestMetrics -> unit) -> TableContext<Batch.Schema>) =
member _.Context(collector) = createContext collector
member _.TableName = tableName
/// As per Equinox.CosmosStore, we assume the table to be provisioned correctly (see DynamoStoreClient.Connect(ConnectMode) re validating on startup)
static member Create(client, tableName) =
let createContext collector = TableContext<Batch.Schema>(client, tableName, metricsCollector = collector)
Container(tableName, createContext)
member x.TryGetTip(stream : string, consistentRead, ct) : Task<Batch option * RequestConsumption> = task {
let rm = Metrics()
let context = createContext rm.Add
let pk = Batch.tableKeyForStreamTip stream
let! item = context.TryGetItemAsync(pk, consistentRead) |> Async.startImmediateAsTask ct
return item |> Option.map Batch.ofSchema, rm.Consumed }
member x.TryUpdateTip(stream : string, updateExpr : Quotations.Expr<Batch.Schema -> Batch.Schema>, ct, ?precondition) : Task<Batch * RequestConsumption> = task {
let rm = Metrics()
let context = createContext rm.Add
let pk = Batch.tableKeyForStreamTip stream
let! item = context.UpdateItemAsync(pk, updateExpr, ?precondition = precondition) |> Async.startImmediateAsTask ct
return item |> Batch.ofSchema, rm.Consumed }
member _.QueryBatches(stream, consistentRead, minN, maxI, backwards, batchSize, ct) : IAsyncEnumerable<int * StopwatchInterval * Batch array * RequestConsumption> =
let compile = (createContext ignore).Template.PrecomputeConditionalExpr
let kc = match maxI with
| Some maxI -> compile <@ fun (b : Batch.Schema) -> b.p = stream && b.i < maxI @>
| None -> compile <@ fun (b : Batch.Schema) -> b.p = stream @>
let fc = match minN with
| Some minN -> compile <@ fun (b : Batch.Schema) -> b.n > minN @> |> Some
| None -> None
let rec aux (i, le) = taskSeq {
// TOCONSIDER could avoid projecting `p`
let rm = Metrics()
let context = createContext rm.Add
let! t, res = context.QueryPaginatedAsync(kc, ?filterCondition = fc, limit = batchSize, ?exclusiveStartKey = le,
scanIndexForward = not backwards, consistentRead = consistentRead)
|> Stopwatch.timeAsync ct
yield i, t, Array.map Batch.ofSchema res.Records, rm.Consumed
match res.LastEvaluatedKey with
| None -> ()
| le -> yield! aux (i + 1, le) }
aux (0, None)
member internal _.QueryIAndNOrderByNAscending(stream, maxItems, ct) : IAsyncEnumerable<int * StopwatchInterval * BatchIndices array * RequestConsumption> =
let rec aux (index, lastEvaluated) = taskSeq {
let rm = Metrics()
let context = createContext rm.Add
let keyCond = <@ fun (b : Batch.Schema) -> b.p = stream @>
let proj = <@ fun (b : Batch.Schema) -> b.i, b.c, b.n @> // TOCONSIDER want len of c, but b.e.Length explodes in empty array case, so no choice but to return the full thing
let! t, res = context.QueryProjectedPaginatedAsync(keyCond, proj, ?exclusiveStartKey = lastEvaluated, scanIndexForward = true, limit = maxItems)
|> Stopwatch.timeAsync ct
yield index, t, [| for i, c, n in res -> { isTip = Batch.isTip i; index = n - int64 c.Length; n = n } |], rm.Consumed
match res.LastEvaluatedKey with
| None -> ()
| le -> yield! aux (index + 1, le) }
aux (0, None)
member x.DeleteItem(stream : string, i, ct) : Task<RequestConsumption> = task {
let rm = Metrics()
let context = createContext rm.Add
let pk = TableKey.Combined(stream, i)
let! _item = context.DeleteItemAsync(pk) |> Async.startImmediateAsTask ct
return rm.Consumed }
/// Represents the State of the Stream for the purposes of deciding how to map a Sync request to DynamoDB operations
[<NoComparison; NoEquality>]
type Position =
{ index : int64; etag : string; calvedBytes : int; baseBytes : int; unfoldsBytes : int; events : Event array }
override x.ToString() = sprintf "{ n=%d; etag=%s; e=%d; b=%d+%d }" x.index x.etag x.events.Length x.baseBytes x.unfoldsBytes
module internal Position =
// NOTE a write of Some 0 to x.b round-trips as None
let fromTip (x : Batch) = { index = x.n; etag = x.etag; events = x.e; calvedBytes = defaultArg x.b 0; baseBytes = Batch.bytesBase x; unfoldsBytes = Batch.bytesUnfolds x }
let fromElements (p, b, n, e, u, etag) = fromTip { p = p; b = Some b; i = Unchecked.defaultof<_>; n = n; e = e; u = u; etag = etag }
let tryFromBatch (x : Batch) = if Batch.isTip x.i then fromTip x |> Some else None
let toIndex = function Some p -> p.index | None -> 0
let toEtag = function Some p -> p.etag | None -> null
let toVersionAndStreamBytes = function Some p -> p.index, p.calvedBytes + p.baseBytes | None -> 0, 0
let null_ i = { index = i; etag = null; calvedBytes = 0; baseBytes = 0; unfoldsBytes = 0; events = Array.empty }
let flatten = function Some p -> p | None -> null_ 0
module internal Sync =
let private (|DynamoDbConflict|_|) : exn -> _ = function
| Precondition.CheckFailed
| TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> Some ()
| _ -> None
let private cce : Quotations.Expr<Batch.Schema -> bool> -> ConditionExpression<Batch.Schema> = template.PrecomputeConditionalExpr
let private cue (ue : Quotations.Expr<Batch.Schema -> Batch.Schema>) = template.PrecomputeUpdateExpr ue
let private batchDoesNotExistCondition = cce <@ fun t -> NOT_EXISTS t.i @>
let private putItemIfNotExists item = TransactWrite.Put (item, Some batchDoesNotExistCondition)
let private updateTip stream updater cond = TransactWrite.Update (Batch.tableKeyForStreamTip stream, Some (cce cond), cue updater)
[<RequireQualifiedAccess; NoComparison; NoEquality>]
type Req =
| Append of tipWasEmpty : bool * events : Event array
| Calve of calfEvents : Event array * updatedTipEvents : Event array * appendedCount : int
[<RequireQualifiedAccess>]
type internal Exp =
| Version of int64
| Etag of string
let private generateRequests (stream : string) (req, u, exp, b', n') etag'
: TransactWrite<Batch.Schema> list =
let u = Batch.unfoldsToSchema u
// TOCONSIDER figure out way to remove special casing (replaceTipEvents): Array.append to empty e/c triggers exception
let replaceTipEvents, tipA, (tipC, tipE), (maybeCalf : Batch.Schema option) =
match req with
| Req.Append (tipWasEmpty, eventsToAppendToTip) ->
let replaceTipEvents = tipWasEmpty && eventsToAppendToTip.Length <> 0
replaceTipEvents, eventsToAppendToTip.Length, Batch.eventsToSchema eventsToAppendToTip, None
| Req.Calve (calf, tipUpdatedEvents, freshEventsCount) ->
let tipA = min tipUpdatedEvents.Length freshEventsCount
let calf : Batch.Schema =
let calfA = freshEventsCount - tipA
let calfC, calfE = Batch.eventsToSchema calf
let tipIndex = n' - tipUpdatedEvents.LongLength
let calfIndex = tipIndex - calf.LongLength
{ p = stream; i = calfIndex; a = calfA; b = None; etag = None; u = [||]; c = calfC; e = calfE; n = tipIndex }
true, tipA, (Batch.eventsToSchema tipUpdatedEvents), Some calf
let genFreshTipItem () : Batch.Schema =
{ p = stream; i = Batch.tipMagicI; a = tipA; b = Some b'; etag = Some etag'; u = u; n = n'; e = tipE; c = tipC }
let updateTipIf condExpr =
let updExpr : Quotations.Expr<Batch.Schema -> Batch.Schema> =
if replaceTipEvents then <@ fun t -> { t with a = tipA; b = Some b'; etag = Some etag'; u = u; n = n'; e = tipE; c = tipC } @>
elif tipE.Length = 0 then <@ fun t -> { t with a = 0; b = Some b'; etag = Some etag'; u = u } @>
else <@ fun t -> { t with a = tipA; b = Some b'; etag = Some etag'; u = u; n = n'; e = Array.append t.e tipE; c = Array.append t.c tipC } @>
updateTip stream updExpr condExpr
[ match maybeCalf with
| Some calfItem -> putItemIfNotExists calfItem
| None -> ()
match exp with
| Exp.Version 0L| Exp.Etag null -> putItemIfNotExists (genFreshTipItem ())
| Exp.Etag etag -> updateTipIf <@ fun t -> t.etag = Some etag @>
| Exp.Version ver -> updateTipIf <@ fun t -> t.n = ver @> ]
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type private Res =
| Written of etag' : string
| ConflictUnknown
let private transact (container : Container, stream : string) requestArgs ct : Task<struct (RequestConsumption * Res)> = task {
let etag' = let g = Guid.NewGuid() in g.ToString "N"
let actions = generateRequests stream requestArgs etag'
let rm = Metrics()
try do! let context = container.Context(rm.Add)
match actions with
| [ TransactWrite.Put (item, Some cond) ] -> context.PutItemAsync(item, cond) |> Async.Ignore
| [ TransactWrite.Update (key, Some cond, updateExpr) ] -> context.UpdateItemAsync(key, updateExpr, cond) |> Async.Ignore
| actions -> context.TransactWriteItems actions |> Async.Ignore
|> Async.startImmediateAsTask ct
return struct (rm.Consumed, Res.Written etag')
with DynamoDbConflict ->
return rm.Consumed, Res.ConflictUnknown }
let private transactLogged (container, stream) (baseBytes, baseEvents, req, unfolds, exp, b', n', ct) (log : ILogger)
: Task<Res> = task {
let! t, ({ total = ru } as rc, result) = transact (container, stream) (req, unfolds, exp, b', n') |> Stopwatch.time ct
let calfBytes, calfCount, tipBytes, tipEvents, appended = req |> function
| Req.Append (_tipWasEmpty, appends) -> 0, 0, baseBytes + Event.arrayBytes appends, baseEvents + appends.Length, appends
| Req.Calve (calf, tip, appendedCount) -> Event.arrayBytes calf, calf.Length, baseBytes + Event.arrayBytes tip, tip.Length,
Seq.append calf tip |> Seq.skip (calf.Length + tip.Length - appendedCount) |> Seq.toArray
let exp, log = exp |> function
| Exp.Etag etag -> "e="+etag, log |> Log.prop "expectedEtag" etag
| Exp.Version ev -> "v="+string ev, log |> Log.prop "expectedVersion" ev
let outcome, log =
let reqMetric = Log.metric container.TableName stream t (calfBytes + tipBytes) (appended.Length + unfolds.Length) rc
match result with
| Res.Written etag' -> "OK", log |> Log.event ((if calfBytes = 0 then Log.Metric.SyncAppend else Log.Metric.SyncCalve) reqMetric)
|> Log.prop "nextPos" n'
|> Log.prop "nextEtag" etag'
| Res.ConflictUnknown -> "Conflict", log |> Log.event ((if calfBytes = 0 then Log.Metric.SyncAppendConflict else Log.Metric.SyncCalveConflict) reqMetric)
|> Log.prop "conflict" true
|> Log.prop "eventTypes" (Seq.truncate 5 (seq { for x in appended -> x.c }))
let appendedBytes, unfoldsBytes = Event.arrayBytes appended, Unfold.arrayBytes unfolds
if calfBytes <> 0 then
log.Information("EqxDynamo {action:l}{act:l} {outcome:l} {stream:l} {ms:f1}ms {ru}RU {exp:l} {appendedE}e {appendedB}b Tip {baseE}->{tipE}e {baseB}->{tipB}b Unfolds {unfolds} {unfoldsBytes}b Calf {calfEvents} {calfBytes}b",
"Sync", "Calve", outcome, stream, t.ElapsedMilliseconds, ru, exp, appended.Length, appendedBytes, baseEvents, tipEvents, baseBytes, tipBytes, unfolds.Length, unfoldsBytes, calfCount, calfBytes)
else log.Information("EqxDynamo {action:l}{act:l} {outcome:l} {stream:l} {ms:f1}ms {ru}RU {exp:l} {appendedE}e {appendedB}b Events {events} {tipB}b Unfolds {unfolds} {unfoldsB}b",
"Sync", "Append", outcome, stream, t.ElapsedMilliseconds, ru, exp, appended.Length, appendedBytes, tipEvents, tipBytes, unfolds.Length, unfoldsBytes)
return result }
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Result =
| Written of etag : string * predecessorBytes : int * events : Event array * unfolds : Unfold array
| ConflictUnknown
let private maxDynamoDbItemSize = 400 * 1024
let handle log (maxEvents, maxBytes) (container, stream)
(pos, exp, n', events : IEventData<EncodedBody> array, unfolds : IEventData<EncodedBody> array, ct) = task {
let baseIndex = int n' - events.Length
let events : Event array = events |> Array.mapi (fun i e ->
{ i = baseIndex + i; t = e.Timestamp; c = e.EventType; d = EncodedBody.toInternal e.Data; m = EncodedBody.toInternal e.Meta
correlationId = Option.ofObj e.CorrelationId; causationId = Option.ofObj e.CausationId })
let unfolds : Unfold array = unfolds |> Array.map (fun (x : IEventData<_>) ->
{ i = n'; t = x.Timestamp; c = x.EventType; d = EncodedBody.toInternal x.Data; m = EncodedBody.toInternal x.Meta })
if Array.isEmpty events && Array.isEmpty unfolds then invalidOp "Must write either events or unfolds."
let cur = Position.flatten pos
let req, predecessorBytes', tipEvents' =
let eventOverflow = maxEvents |> Option.exists (fun limit -> events.Length + cur.events.Length > limit)
if eventOverflow || cur.baseBytes + Unfold.arrayBytes unfolds + Event.arrayBytes events > maxBytes then
let calfEvents, residualEvents = ResizeArray(cur.events.Length + events.Length), ResizeArray()
let mutable calfFull, calfSize = false, 1024
for e in Seq.append cur.events events do
match calfFull, calfSize + Event.Bytes e with
| false, calfSize' when calfSize' < maxDynamoDbItemSize -> calfSize <- calfSize'; calfEvents.Add e
| _ -> calfFull <- true; residualEvents.Add e
let calfEvents = calfEvents.ToArray()
let tipEvents = residualEvents.ToArray()
Req.Calve (calfEvents, tipEvents, events.Length), cur.calvedBytes + Event.arrayBytes calfEvents, tipEvents
else Req.Append (Array.isEmpty cur.events, events), cur.calvedBytes, Array.append cur.events events
match! transactLogged (container, stream) (cur.baseBytes, cur.events.Length, req, unfolds, exp pos, predecessorBytes', n', ct) log with
| Res.ConflictUnknown -> return Result.ConflictUnknown
| Res.Written etag' -> return Result.Written (etag', predecessorBytes', tipEvents', unfolds) }
module internal Tip =
[<RequireQualifiedAccess>]
type Res<'T> =
| Found of 'T
| NotFound
| NotModified
let private get (container : Container, stream : string) consistentRead (maybePos : Position option) ct = task {
match! container.TryGetTip(stream, consistentRead, ct) with
| Some { etag = fe }, rc when fe = Position.toEtag maybePos -> return rc, Res.NotModified
| Some t, rc -> return rc, Res.Found t
| None, rc -> return rc, Res.NotFound }
let private loggedGet (get : Container * string -> bool -> Position option -> CancellationToken -> Task<_>) (container, stream) consistentRead (maybePos : Position option) (log : ILogger) ct = task {
let log = log |> Log.prop "stream" stream
let! t, ({ total = ru } as rc, res : Res<_>) = get (container, stream) consistentRead maybePos |> Stopwatch.time ct
let logMetric bytes count (f : Log.Measurement -> _) = log |> Log.event (f (Log.metric container.TableName stream t bytes count rc))
match res with
| Res.NotModified ->
(logMetric 0 0 Log.Metric.TipNotModified).Information("EqxDynamo {action:l} {res} {stream:l} {ms:f1}ms {ru}RU",
"Tip", 304, stream, t.ElapsedMilliseconds, ru)
| Res.NotFound ->
(logMetric 0 0 Log.Metric.TipNotFound).Information("EqxDynamo {action:l} {res} {stream:l} {ms:f1}ms {ru}RU",
"Tip", 404, stream, t.ElapsedMilliseconds, ru)
| Res.Found tip ->
let eventsCount, unfoldsCount, bb, ub = tip.e.Length, tip.u.Length, Batch.bytesBase tip, Batch.bytesUnfolds tip
let log = logMetric (bb + ub) (eventsCount + unfoldsCount) Log.Metric.Tip
let log = match maybePos with Some p -> log |> Log.prop "startPos" p |> Log.prop "startEtag" p | None -> log
log.Information("EqxDynamo {action:l} {res} {stream:l} v{n} {ms:f1}ms {ru}RU {etag} {events}e {unfolds}u {baseBytes}+{unfoldsBytes}b",
"Tip", 200, stream, tip.n, t.ElapsedMilliseconds, ru, tip.etag, eventsCount, unfoldsCount, bb, ub)
return ru, res }
let private enumEventsAndUnfolds (minIndex, maxIndex) (x : Batch) : ITimelineEvent<InternalBody> array =
Seq.append<ITimelineEvent<_>> (Batch.enumEvents (minIndex, maxIndex) x |> Seq.cast) (x.u |> Seq.cast)
// where Index is equal, unfolds get delivered after the events so the fold semantics can be 'idempotent'
|> Seq.sortBy (fun x -> x.Index, x.IsUnfold)
|> Array.ofSeq
/// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with Result.NotModified
let tryLoad (log : ILogger) containerStream consistentRead (maybePos : Position option, maxIndex) ct : Task<Res<Position * int64 * ITimelineEvent<InternalBody> array>> = task {
let! _rc, res = loggedGet get containerStream consistentRead maybePos log ct
match res with
| Res.NotModified -> return Res.NotModified
| Res.NotFound -> return Res.NotFound
| Res.Found tip ->
let minIndex = maybePos |> Option.map (fun x -> x.index)
return Res.Found (Position.fromTip tip, Batch.baseIndex tip, tip |> enumEventsAndUnfolds (minIndex, maxIndex)) }
module internal Query =
let private mkQuery (log : ILogger) (container : Container, stream : string) consistentRead maxItems (direction : Direction, minIndex, maxIndex) ct =
let minN, maxI = minIndex, maxIndex
log.Debug("EqxDynamo Query {stream}; n>{minIndex} i<{maxIndex}", stream, Option.toNullable minIndex, Option.toNullable maxIndex)
container.QueryBatches(stream, consistentRead, minN, maxI, (direction = Direction.Backward), maxItems, ct)
// Unrolls the Batches in a response
// NOTE when reading backwards, the events are emitted in reverse Index order to suit the takeWhile consumption
let private mapPage direction (container : Container, stream : string) (minIndex, maxIndex, maxItems) (maxRequests : int option)
(log : ILogger) (i, t, batches : Batch array, rc)
: Event array * Position option * RequestConsumption =
match maxRequests with
| Some mr when i >= mr -> log.Information("EqxDynamo {action:l} Page {page} Limit exceeded", i); invalidOp "batch Limit exceeded"
| _ -> ()
let unwrapBatch (x : Batch) =
Batch.enumEvents (minIndex, maxIndex) x
|> if direction = Direction.Backward then Seq.rev else id
let events = batches |> Seq.collect unwrapBatch |> Array.ofSeq
let usedEventsCount, usedBytes, totalBytes = events.Length, Event.arrayBytes events, Batch.bytesTotal batches
let baseIndex = if usedEventsCount = 0 then Nullable () else Nullable (Seq.map Batch.baseIndex batches |> Seq.min)
let minI, maxI = match events with [||] -> Nullable (), Nullable () | xs -> Nullable events[0].i, Nullable events[xs.Length - 1].i
(log|> Log.event (Log.Metric.QueryResponse (direction, Log.metric container.TableName stream t totalBytes usedEventsCount rc)))
.Information("EqxDynamo {action:l} {page} {minIndex}-{maxIndex} {ms:f1}ms {ru}RU {batches}/{batchSize}@{index} {count}e {bytes}/{totalBytes}b {direction:l}",
"Page", i, minI, maxI, t.ElapsedMilliseconds, rc.total, batches.Length, maxItems, baseIndex, usedEventsCount, usedBytes, totalBytes, direction)
let maybePosition = batches |> Array.tryPick Position.tryFromBatch
events, maybePosition, rc
let private logQuery (direction, minIndex, maxIndex) (container : Container, stream) interval (responsesCount, events : Event array) n (rc : RequestConsumption) (log : ILogger) =
let count, bytes = events.Length, Event.arrayBytes events
let reqMetric = Log.metric container.TableName stream interval bytes count rc
let evt = Log.Metric.Query (direction, responsesCount, reqMetric)
let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB"
(log|> Log.event evt).Information(
"EqxDynamo {action:l} {stream:l} v{n} {ms:f1}ms {ru}RU {count}e/{responses} {bytes}b >{minN} <{maxI}",
action, stream, n, interval.ElapsedMilliseconds, rc.total, count, responsesCount, bytes, Option.toNullable minIndex, Option.toNullable maxIndex)
let private calculateUsedVersusDroppedPayload stopIndex (xs : Event array) : int * int =
let mutable used, dropped = 0, 0
let mutable found = false
for x in xs do
let bytes = Event.Bytes x
if found then dropped <- dropped + bytes
else used <- used + bytes
if x.i = stopIndex then found <- true
used, dropped
[<RequireQualifiedAccess; NoComparison; NoEquality>]
type ScanResult<'event> = { found : bool; minIndex : int64; next : int64; maybeTipPos : Position option; events : 'event array }
let scanTip (tryDecode : ITimelineEvent<EncodedBody> -> 'event voption, isOrigin : 'event -> bool) (pos : Position, i : int64, xs : ITimelineEvent<InternalBody> array)
: ScanResult<'event> =
let items = ResizeArray(xs.Length)
let isOrigin' e =
match EncodedBody.ofInternal e |> tryDecode with
| ValueNone -> false
| ValueSome e ->
items.Insert(0, e) // WalkResult always renders events ordered correctly - here we're aiming to align with Enum.EventsAndUnfolds
isOrigin e
let f, e = xs |> Seq.tryFindBack isOrigin' |> Option.isSome, items.ToArray()
{ found = f; maybeTipPos = Some pos; minIndex = i; next = pos.index + 1L; events = e }
let scan<'event> (log : ILogger) (container, stream) consistentRead maxItems maxRequests direction
(tryDecode : ITimelineEvent<EncodedBody> -> 'event voption, isOrigin : 'event -> bool)
(minIndex, maxIndex, ct)
: Task<ScanResult<'event> option> = task {
let mutable found = false
let mutable responseCount = 0
let mergeBatches (log : ILogger) (batchesBackward : IAsyncEnumerable<Event array * Position option * RequestConsumption>) = task {
let mutable lastResponse, maybeTipPos, ru = None, None, 0.
let! events =
batchesBackward
|> TaskSeq.collectSeq (fun (events, maybePos, rc) ->
if Option.isNone maybeTipPos then maybeTipPos <- maybePos
lastResponse <- Some events; ru <- ru + rc.total
responseCount <- responseCount + 1
seq { for x in events -> struct (x, x |> EncodedBody.ofInternal |> tryDecode) })
|> TaskSeq.takeWhileInclusive (function
| struct (x, ValueSome e) when isOrigin e ->
found <- true
let log = log |> Log.prop "stream" stream
let logLevel = if x.i = 0 then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
match lastResponse with
| None -> log.Write(logLevel, "EqxDynamo Stop @{index} {case}", x.i, x.c)
| Some batch ->
let used, residual = batch |> calculateUsedVersusDroppedPayload x.i
log.Write(logLevel, "EqxDynamo Stop @{index} {case} used {used}b residual {residual}b", x.i, x.c, used, residual)
false
| _ -> true)
|> TaskSeq.toArrayAsync
return events, maybeTipPos, { total = ru } }
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream
let readLog = log |> Log.prop "direction" direction
let batches ct : IAsyncEnumerable<Event array * Position option * RequestConsumption> =
mkQuery readLog (container, stream) consistentRead maxItems (direction, minIndex, maxIndex) ct
|> TaskSeq.map (mapPage direction (container, stream) (minIndex, maxIndex, maxItems) maxRequests readLog)
let! t, (events, maybeTipPos, ru) = batches >> mergeBatches log |> Stopwatch.time ct
let raws = Array.map ValueTuple.fst events
let decoded = if direction = Direction.Forward then Array.chooseV ValueTuple.snd events else let xs = Array.chooseV ValueTuple.snd events in Array.Reverse xs; xs
let minMax = (None, raws) ||> Array.fold (fun acc x -> let i = int64 x.i in Some (match acc with None -> i, i | Some (n, x) -> min n i, max x i))
let version =
match maybeTipPos, minMax with
| Some { index = max }, _
| _, Some (_, max) -> max + 1L
| None, None -> 0L
log |> logQuery (direction, minIndex, maxIndex) (container, stream) t (responseCount, raws) version ru
match minMax, maybeTipPos with
| Some (i, m), _ -> return Some ({ found = found; minIndex = i; next = m + 1L; maybeTipPos = maybeTipPos; events = decoded } : ScanResult<_>)
| None, Some { index = tipI } -> return Some { found = found; minIndex = tipI; next = tipI; maybeTipPos = maybeTipPos; events = [||] }
| None, _ -> return None }
let walkLazy<'event> (log : ILogger) (container, stream) maxItems maxRequests
(tryDecode : ITimelineEvent<EncodedBody> -> 'event option, isOrigin : 'event -> bool)
(direction, minIndex, maxIndex) ct
: IAsyncEnumerable<'event array> = taskSeq {
let query = mkQuery log (container, stream) (*consistentRead*)false maxItems (direction, minIndex, maxIndex)
let readPage = mapPage direction (container, stream) (minIndex, maxIndex, maxItems) maxRequests
let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream
let readLog = log |> Log.prop "direction" direction
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let query = query ct |> TaskSeq.map (readPage readLog)
let allEvents = ResizeArray()
let mutable i, ru = 0, 0.
try let mutable ok = true
use e = query.GetAsyncEnumerator(ct)
while ok do
let batchLog = readLog |> Log.prop "batchIndex" i
match maxRequests with
| Some mr when i + 1 >= mr -> batchLog.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded"
| _ -> ()
let! more = e.MoveNextAsync()
if not more then ok <- false else // rest of block does not happen, while exits
let events, _pos, rc = e.Current
ru <- ru + rc.total
allEvents.AddRange(events)
let acc = ResizeArray()
for x in events do
match x |> EncodedBody.ofInternal |> tryDecode with
| Some e when isOrigin e ->
let used, residual = events |> calculateUsedVersusDroppedPayload x.i
log.Information("EqxDynamo Stop stream={stream} at={index} {case} used={used} residual={residual}",
stream, x.i, x.c, used, residual)
ok <- false
acc.Add e
| Some e -> acc.Add e
| None -> ()
i <- i + 1
yield acc.ToArray()
finally
let endTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let t = StopwatchInterval(startTicks, endTicks)
log |> logQuery (direction, minIndex, maxIndex) (container, stream) t (i, allEvents.ToArray()) -1L { total = ru } }
type [<NoComparison; NoEquality>] LoadRes = Pos of Position | Empty | Next of int64
let toPosition = function Pos p -> Some p | Empty -> None | Next _ -> failwith "unexpected"
/// Manages coalescing of spans of events obtained from various sources:
/// 1) Tip Data and/or Conflicting events
/// 2) Querying Primary for predecessors of what's obtained from 1
/// 3) Querying Archive for predecessors of what's obtained from 2
let load (log : ILogger) (minIndex, maxIndex) (tip : ScanResult<'event> option)
(primary : int64 option * int64 option * CancellationToken -> Task<ScanResult<'event> option>)
// Choice1Of2 -> indicates whether it's acceptable to ignore missing events; Choice2Of2 -> Fallback store
(fallback : Choice<bool, int64 option * int64 option * CancellationToken -> Task<ScanResult<'event> option>>) ct
: Task<Position option * 'event array> = task {
let minI = defaultArg minIndex 0L
match tip with
| Some { found = true; maybeTipPos = Some p; events = e } -> return Some p, e
| Some { minIndex = i; maybeTipPos = Some p; events = e } when i <= minI -> return Some p, e
| _ ->
let i, events, pos =
match tip with
| Some { minIndex = i; maybeTipPos = p; events = e } -> Some i, e, p
| None -> maxIndex, Array.empty, None
let! primary = primary (minIndex, i, ct)
let events, pos =
match primary with
| None -> events, match pos with Some p -> Pos p | None -> Empty
| Some primary -> Array.append primary.events events, match pos |> Option.orElse primary.maybeTipPos with Some p -> Pos p | None -> Next primary.next
let inline logMissing (minIndex, maxIndex) message =
if log.IsEnabled Events.LogEventLevel.Debug then
(log|> fun log -> match minIndex with None -> log | Some mi -> log |> Log.prop "minIndex" mi
|> fun log -> match maxIndex with None -> log | Some mi -> log |> Log.prop "maxIndex" mi)
.Debug(message)
match primary, fallback with
| Some { found = true }, _ -> return toPosition pos, events // origin found in primary, no need to look in fallback
| Some { minIndex = i }, _ when i <= minI -> return toPosition pos, events // primary had required earliest event Index, no need to look at fallback
| None, _ when Option.isNone tip -> return toPosition pos, events // initial load where no documents present in stream
| _, Choice1Of2 allowMissing ->
logMissing (minIndex, i) "Origin event not found; no Archive Table supplied"
if allowMissing then return toPosition pos, events
else return failwithf "Origin event not found; no Archive Table supplied"
| _, Choice2Of2 fallback ->
let maxIndex = match primary with Some p -> Some p.minIndex | None -> maxIndex // if no batches in primary, high water mark from tip is max
let! fallback = fallback (minIndex, maxIndex, ct)
let events =
match fallback with
| Some s -> Array.append s.events events
| None -> events
match fallback with
| Some { minIndex = i } when i <= minI -> ()
| Some { found = true } -> ()
| _ -> logMissing (minIndex, maxIndex) "Origin event not found in Archive Table"
return toPosition pos, events }
// Manages deletion of (full) Batches, and trimming of events in Tip, maintaining ordering guarantees by never updating non-Tip batches
// Additionally, the nature of the fallback algorithm requires that deletions be carried out in sequential order so as not to leave gaps
// NOTE: module is public so BatchIndices can be deserialized into
module internal Prune =
let until (log : ILogger) (container : Container, stream : string) maxItems indexInclusive ct : Task<int * int * int64> = task {
let log = log |> Log.prop "stream2" stream
let deleteItem i count : Task<RequestConsumption> = task {
let! t, rc = (fun ct -> container.DeleteItem(stream, i, ct)) |> Stopwatch.time ct
let reqMetric = Log.metric container.TableName stream t -1 count rc
let log = let evt = Log.Metric.Delete reqMetric in log |> Log.event evt
log.Information("EqxDynamo {action:l} {i} {ms:f1}ms {ru}RU", "Delete", i, t.ElapsedMilliseconds, rc)
return rc }
let trimTip expectedN count = task {
match! container.TryGetTip(stream, (*consistentRead = *)false, ct) with
| None, _rc -> return failwith "unexpected NotFound"
| Some tip, _rc when tip.n <> expectedN -> return failwithf "Concurrent write detected; Expected n=%d actual=%d" expectedN tip.n
| Some tip, tipRc ->
let tC, tE = Batch.eventsToSchema tip.e
let tC', tE' = Array.skip count tC, Array.skip count tE
let updEtag = let g = Guid.NewGuid() in g.ToString "N"
let condExpr : Quotations.Expr<Batch.Schema -> bool> = <@ fun t -> t.etag = Some tip.etag @>
let updateExpr : Quotations.Expr<Batch.Schema -> _> = <@ fun t -> { t with etag = Some updEtag; c = tC'; e = tE' } @>
let! t, (_updated, updRc) = (fun ct -> container.TryUpdateTip(stream, updateExpr, ct, condExpr)) |> Stopwatch.time ct
let rc = { total = tipRc.total + updRc.total }
let reqMetric = Log.metric container.TableName stream t -1 count rc
let log = let evt = Log.Metric.Trim reqMetric in log |> Log.event evt
log.Information("EqxDynamo {action:l} {count} {ms:f1}ms {ru}RU", "Trim", count, t.ElapsedMilliseconds, rc.total)
return rc }
let log = log |> Log.prop "index" indexInclusive
// need to sort by n to guarantee we don't ever leave an observable gap in the sequence
let query ct = container.QueryIAndNOrderByNAscending(stream, maxItems, ct)
let mapPage (i, t : StopwatchInterval, batches : BatchIndices array, rc) =
let next = Array.tryLast batches |> Option.map (fun x -> x.n)
let reqMetric = Log.metric container.TableName stream t -1 batches.Length rc
let log = let evt = Log.Metric.PruneResponse reqMetric in log |> Log.prop "batchIndex" i |> Log.event evt
log.Information("EqxDynamo {action:l} {batches} {ms:f1}ms n={next} {ru}RU",
"PruneResponse", batches.Length, t.ElapsedMilliseconds, Option.toNullable next, rc.total)
batches, rc
let! pt, outcomes =
let isRelevant (x : BatchIndices) = x.index <= indexInclusive || x.isTip
let handle (batches : BatchIndices array, rc) = task {
let mutable delCharges, batchesDeleted, trimCharges, batchesTrimmed, eventsDeleted, eventsDeferred = 0., 0, 0., 0, 0, 0
let mutable lwm = None
for x in batches |> Seq.takeWhile (fun x -> isRelevant x || lwm = None) do
let batchSize = x.n - x.index |> int
let eligibleEvents = max 0 (min batchSize (int (indexInclusive + 1L - x.index)))
if x.isTip then // Even if we remove the last event from the Tip, we need to retain a) unfolds b) position (n)
if eligibleEvents <> 0 then
let! charge = trimTip x.n eligibleEvents
trimCharges <- trimCharges + charge.total
batchesTrimmed <- batchesTrimmed + 1
eventsDeleted <- eventsDeleted + eligibleEvents
if lwm = None then
lwm <- Some (x.index + int64 eligibleEvents)
elif x.n <= indexInclusive + 1L then
let! charge = deleteItem x.index batchSize
delCharges <- delCharges + charge.total
batchesDeleted <- batchesDeleted + 1
eventsDeleted <- eventsDeleted + batchSize
else // can't update a non-Tip batch, or it'll be ordered wrong from a CFP perspective
eventsDeferred <- eventsDeferred + eligibleEvents
if lwm = None then
lwm <- Some x.index
return (rc, delCharges, trimCharges), lwm, (batchesDeleted + batchesTrimmed, eventsDeleted, eventsDeferred) }
let hasRelevantItems (batches, _rc) = batches |> Array.exists isRelevant
let load ct =
query ct
|> TaskSeq.map mapPage
|> TaskSeq.takeWhile hasRelevantItems
|> TaskSeq.mapAsync handle
|> TaskSeq.toArrayAsync
load |> Stopwatch.time ct
let mutable lwm, queryCharges, delCharges, trimCharges, responses, batches, eventsDeleted, eventsDeferred = None, 0., 0., 0., 0, 0, 0, 0
let accumulate ((qc, dc, tc), bLwm, (bCount, eDel, eDef)) =
lwm <- max lwm bLwm
queryCharges <- queryCharges + qc.total
delCharges <- delCharges + dc
trimCharges <- trimCharges + tc
responses <- responses + 1
batches <- batches + bCount
eventsDeleted <- eventsDeleted + eDel
eventsDeferred <- eventsDeferred + eDef
outcomes |> Array.iter accumulate
let reqMetric = Log.metric container.TableName stream pt eventsDeleted batches { total = queryCharges }
let log = let evt = Log.Metric.Prune (responses, reqMetric) in log |> Log.event evt
let lwm = lwm |> Option.defaultValue 0L // If we've seen no batches at all, then the write position is 0L
log.Information("EqxDynamo {action:l} {events}/{batches} lwm={lwm} {ms:f1}ms queryRu={queryRu} deleteRu={deleteRu} trimRu={trimRu}",
"Prune", eventsDeleted, batches, lwm, pt.ElapsedMilliseconds, queryCharges, delCharges, trimCharges)
return eventsDeleted, eventsDeferred, lwm }
type [<NoComparison; NoEquality>] Token = { pos : Position option }
module Token =
let create_ pos : StreamToken =
let v, b = Position.toVersionAndStreamBytes pos
{ value = box { pos = pos }; version = v; streamBytes = b }