Skip to content

Commit

Permalink
Remove AsyncSeq dependency from Equinox.Core
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 27, 2019
1 parent bd2ac9d commit 8fb23c2
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ The components within this repository are delivered as multi-targeted Nuget pack

### Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in realizing the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation . ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `FSharp.Control.AsyncSeq`, `System.Runtime.Caching`
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance baselining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`)
- `Equinox.EventStore` [![EventStore NuGet](https://img.shields.io/nuget/v/Equinox.EventStore.svg)](https://www.nuget.org/packages/Equinox.EventStore/): Production-strength [EventStore](https://eventstore.org/) Adapter instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.EventStore) on `Equinox`, `EventStore.Client[Api.NetCore] >= 5.0.1`, `System.Runtime.Caching`, `FSharp.Control.AsyncSeq`)
- `Equinox.Cosmos` [![Cosmos NuGet](https://img.shields.io/nuget/v/Equinox.Cosmos.svg)](https://www.nuget.org/packages/Equinox.Cosmos/): Production-strength Azure CosmosDb Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.Cosmos) on `Equinox`, `Microsoft.Azure.DocumentDb[.Core] >= 2`, `System.Runtime.Caching`, `Newtonsoft.Json >= 11.0.2`, `FSharp.Control.AsyncSeq`)
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in realizing the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation . ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance baselining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`)
- `Equinox.EventStore` [![EventStore NuGet](https://img.shields.io/nuget/v/Equinox.EventStore.svg)](https://www.nuget.org/packages/Equinox.EventStore/): Production-strength [EventStore](https://eventstore.org/) Adapter instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.EventStore) on `Equinox.Core`, `EventStore.Client[Api.NetCore] >= 5.0.1`, `System.Runtime.Caching`, `FSharp.Control.AsyncSeq`)
- `Equinox.Cosmos` [![Cosmos NuGet](https://img.shields.io/nuget/v/Equinox.Cosmos.svg)](https://www.nuget.org/packages/Equinox.Cosmos/): Production-strength Azure CosmosDb Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.Cosmos) on `Equinox.Core`, `Microsoft.Azure.DocumentDb[.Core] >= 2`, `System.Runtime.Caching`, `Newtonsoft.Json >= 11.0.2`, `FSharp.Control.AsyncSeq`)

### Projection libraries

Expand Down
3 changes: 1 addition & 2 deletions src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<DefineConstants Condition=" '$(TargetFramework)' == 'net461' ">$(DefineConstants);NET461</DefineConstants>
<DefineConstants>$(DefineConstants);NO_ASYNCSEQ</DefineConstants>
</PropertyGroup>

<ItemGroup>
Expand All @@ -27,8 +28,6 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<PackageReference Include="FSharp.Control.AsyncSeq" Version="2.0.21" />
</ItemGroup>

</Project>
2 changes: 2 additions & 0 deletions src/Equinox.Core/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Async with
sc ())
|> ignore)

#if !NO_ASYNCSEQ
module AsyncSeq =
/// Same as takeWhileAsync, but returns the final element too
let takeWhileInclusiveAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
Expand All @@ -84,6 +85,7 @@ module AsyncSeq =
/// Same as takeWhile, but returns the final element too
let takeWhileInclusive p (source : AsyncSeq<'T>) =
takeWhileInclusiveAsync (p >> async.Return) source
#endif

[<RequireQualifiedAccess>]
module Regex =
Expand Down
30 changes: 15 additions & 15 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]

/// The Domain Events (as opposed to Unfolded Events, see Tip) at this offset in the stream
e: Event[] }
/// Unless running in single partion mode (which would restrict us to 10GB per collection)
/// Unless running in single partition mode (which would restrict us to 10GB per collection)
/// we need to nominate a partition key that will be in every document
static member internal PartitionKeyField = "p"
/// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use
Expand Down Expand Up @@ -142,7 +142,6 @@ type [<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]

/// Compaction/Snapshot/Projection events - owned and managed by the sync stored proc
u: Unfold[] }
/// arguably this should be a high nember to reflect fact it is the freshest ?
static member internal WellKnownDocumentId = "-1"

/// Position and Etag to which an operation is relative
Expand Down Expand Up @@ -527,11 +526,11 @@ function sync(req, expIndex, expEtag) {
try let! r = c.Client.CreateStoredProcedureAsync(c.CollectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect
return r.RequestCharge
with CosmosException ((CosmosStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge }
let private mkContainerProperties idFieldName partionKeyFieldName =
let private mkContainerProperties idFieldName partitionKeyFieldName =
// While the v2 SDK and earlier portal versions admitted 'fixed' collections where no Partition Key is defined, we follow the recent policy
// simplification of having a convention of always defining a partion key
// simplification of having a convention of always defining a partition key
let pkd = PartitionKeyDefinition()
pkd.Paths.Add(sprintf "/%s" partionKeyFieldName)
pkd.Paths.Add(sprintf "/%s" partitionKeyFieldName)
DocumentCollection(Id = idFieldName, PartitionKey = pkd)
let private createBatchAndTipContainerIfNotExists (client: Client.DocumentClient) (dName,cName) mode : Async<unit> =
let def = mkContainerProperties cName Batch.PartitionKeyField
Expand Down Expand Up @@ -561,7 +560,7 @@ function sync(req, expIndex, expEtag) {
if not skipStoredProc then
do! createSyncStoredProcIfNotExists (Some log) container }
let initAux (client: Client.DocumentClient) (dName,cName) rus = async {
// Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partion keys)
// Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partition keys)
let mode = Provisioning.Container rus
do! createOrProvisionDatabase client dName mode
return! createAuxContainerIfNotExists client (dName,cName) mode }
Expand Down Expand Up @@ -669,11 +668,11 @@ module internal Tip =
: Async<Position * 'event[]> = async {
let responseCount = ref 0
let mergeBatches (log : ILogger) (batchesBackward: AsyncSeq<ITimelineEvent<byte[]>[] * Position option * float>) = async {
let mutable lastResponse, mapbeTipPos, ru = None, None, 0.
let mutable lastResponse, maybeTipPos, ru = None, None, 0.
let! events =
batchesBackward
|> AsyncSeq.map (fun (events, maybePos, r) ->
if mapbeTipPos = None then mapbeTipPos <- maybePos
if maybeTipPos = None then maybeTipPos <- maybePos
lastResponse <- Some events; ru <- ru + r
incr responseCount
events |> Array.map (fun x -> x, tryDecode x))
Expand All @@ -689,7 +688,7 @@ module internal Tip =
false
| _ -> true) (*continue the search*)
|> AsyncSeq.toArrayAsync
return events, mapbeTipPos, ru }
return events, maybeTipPos, ru }
let query = mkQuery (container,stream) maxItems direction startPos
let pullSlice = handleResponse direction stream startPos
let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query)
Expand Down Expand Up @@ -1086,7 +1085,7 @@ type ConnectionMode =
| DirectHttps

type Connector
( /// Timeout to apply to individual reads/write roundtrips going to CosmosDb
( /// Timeout to apply to individual reads/write round-trips going to CosmosDb
requestTimeout: TimeSpan,
/// Maximum number of times attempt when failure reason is a 429 from CosmosDb, signifying RU limits have been breached
maxRetryAttemptsOnThrottledRequests: int,
Expand Down Expand Up @@ -1154,6 +1153,7 @@ namespace Equinox.Cosmos.Core
open Equinox.Cosmos
open Equinox.Cosmos.Store
open FsCodec
open FSharp.Control
open System.Runtime.InteropServices

/// Outcome of appending events, specifying the new and/or conflicting events, together with the updated Target write position
Expand All @@ -1163,7 +1163,7 @@ type AppendResult<'t> =
| Conflict of index: 't * conflictingEvents: ITimelineEvent<byte[]>[]
| ConflictUnknown of index: 't

/// Encapsulates the core facilites Equinox.Cosmos offers for operating directly on Events in Streams.
/// Encapsulates the core facilities Equinox.Cosmos offers for operating directly on Events in Streams.
type Context
( /// Connection to CosmosDb, includes defined Transient Read and Write Retry policies
conn : Connection,
Expand Down Expand Up @@ -1195,7 +1195,7 @@ type Context
member __.ResolveStream(streamName) = containers.Resolve(conn.Client, null, streamName, gateway.CreateSyncStoredProcIfNotExists (Some log))
member __.CreateStream(streamName) = __.ResolveStream streamName |> fst

member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : FSharp.Control.AsyncSeq<ITimelineEvent<byte[]>[]> =
member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : AsyncSeq<ITimelineEvent<byte[]>[]> =
let direction = defaultArg direction Direction.Forward
let batching = BatchingPolicy(defaultArg batchSize batching.MaxItems)
gateway.ReadLazy batching log stream direction startPos (Some,fun _ -> false)
Expand All @@ -1212,15 +1212,15 @@ type Context
| None -> fun _ -> false
return! gateway.Read log stream direction startPos (Some,isOrigin) }

/// Establishes the current position of the stream in as effficient a manner as possible
/// Establishes the current position of the stream in as efficient a manner as possible
/// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks)
member __.Sync(stream, ?position: Position) : Async<Position> = async {
let! (Token.Unpack (_,_,pos')) = gateway.GetPosition(log, stream, ?pos=position)
return pos' }

/// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query
/// ... NB as long as they Dispose!
member __.Walk(stream, batchSize, ?position, ?direction) : FSharp.Control.AsyncSeq<ITimelineEvent<byte[]>[]> =
member __.Walk(stream, batchSize, ?position, ?direction) : AsyncSeq<ITimelineEvent<byte[]>[]> =
__.GetLazy((stream, position), batchSize, ?direction=direction)

/// Reads all Events from a `Position` in a given `direction`
Expand Down Expand Up @@ -1310,7 +1310,7 @@ module Events =
/// reading in batches of the specified size.
/// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest
/// sequence number in the stream.
let getAllBackwards (ctx: Context) (streamName: string) (MaxPosition index: int64) (batchSize: int): FSharp.Control.AsyncSeq<ITimelineEvent<byte[]>[]> =
let getAllBackwards (ctx: Context) (streamName: string) (MaxPosition index: int64) (batchSize: int): AsyncSeq<ITimelineEvent<byte[]>[]> =
ctx.Walk(ctx.CreateStream streamName, batchSize, ?position=index, direction=Direction.Backward)

/// Returns an async array of events in the stream backwards starting from the specified sequence number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

<ItemGroup>
<PackageReference Include="FsCheck.xUnit" Version="2.13.0" />
<PackageReference Include="FSharp.Control.AsyncSeq" Version="2.0.21" />
<PackageReference Include="JsonDiffPatch.Net" Version="2.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
Expand Down

0 comments on commit 8fb23c2

Please sign in to comment.