From 2396e0252a2dd14c89d4ab0e5bf75715b1afe32d Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Mon, 16 Sep 2024 14:06:50 +0200 Subject: [PATCH 1/8] Migrated to central package management, simplifying package upgrades --- Directory.Packages.props | 77 +++++++++++++++++++ Docker/Development/Dockerfile | 1 + Docker/Production/Dockerfile | 1 + Integration/Benchmarks/Benchmarks.csproj | 2 +- Integration/Shared/Shared.csproj | 4 +- Integration/Tests/Services/Services.csproj | 6 +- Runtime.sln | 1 + Source/Actors/Actors.csproj | 13 ++-- Source/CLI/CLI.csproj | 14 ++-- Source/Client/Client.csproj | 4 +- .../Configuration.Management.csproj | 14 ++-- Source/Configuration/Configuration.csproj | 16 ++-- .../DependencyInversion.csproj | 10 +-- Source/Diagnostics/Diagnostics.csproj | 20 ++--- Source/EventHorizon/EventHorizon.csproj | 4 +- .../Events.Store.MongoDB.csproj | 6 +- .../Events.Store.Services.Grpc.csproj | 2 +- Source/Events/Events.csproj | 16 ++-- Source/Execution/Execution.csproj | 4 +- Source/Hosting/Hosting.csproj | 2 +- Source/Metrics/Metrics.csproj | 8 +- Source/MongoDB/MongoDB.csproj | 4 +- Source/Protobuf/Protobuf.csproj | 4 +- Source/Resources/Resources.csproj | 4 +- Source/Rudimentary/Rudimentary.csproj | 4 +- Source/Server/Server.csproj | 17 ++-- .../Services.Clients/Services.Clients.csproj | 6 +- Source/Services/Services.csproj | 16 ++-- Source/Tenancy/Tenancy.csproj | 4 +- .../Events.Store.MongoDB.Tests.csproj | 2 +- .../Events.Store/Events.Store.csproj | 2 +- Specifications/MongoDB/MongoDB.csproj | 2 +- .../Services.Clients/Services.Clients.csproj | 2 +- Specifications/Services/Services.csproj | 2 +- specs.props | 21 ++--- tests.props | 4 +- versions.props | 29 ------- 37 files changed, 199 insertions(+), 149 deletions(-) create mode 100644 Directory.Packages.props diff --git a/Directory.Packages.props b/Directory.Packages.props new file mode 100644 index 000000000..68aef62d6 --- /dev/null +++ b/Directory.Packages.props @@ -0,0 +1,77 @@ + + + + + + true + + + + 7.8.0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/Docker/Development/Dockerfile b/Docker/Development/Dockerfile index 736a0b4cd..18679a8bf 100644 --- a/Docker/Development/Dockerfile +++ b/Docker/Development/Dockerfile @@ -6,6 +6,7 @@ ARG TARGETARCH COPY default.props /app/ COPY versions.props /app/ +COPY Directory.Packages.props /app/ COPY Runtime.sln /app/ COPY Source /app/Source/ COPY Specifications /app/Specifications/ diff --git a/Docker/Production/Dockerfile b/Docker/Production/Dockerfile index 19e04b0a8..ada23f654 100644 --- a/Docker/Production/Dockerfile +++ b/Docker/Production/Dockerfile @@ -6,6 +6,7 @@ ARG TARGETARCH COPY default.props /app/ COPY versions.props /app/ +COPY Directory.Packages.props /app/ COPY Runtime.sln /app/ COPY Source /app/Source/ COPY Specifications /app/Specifications/ diff --git a/Integration/Benchmarks/Benchmarks.csproj b/Integration/Benchmarks/Benchmarks.csproj index b6c33b2bd..16a1fa19d 100644 --- a/Integration/Benchmarks/Benchmarks.csproj +++ b/Integration/Benchmarks/Benchmarks.csproj @@ -13,6 +13,6 @@ - + diff --git a/Integration/Shared/Shared.csproj b/Integration/Shared/Shared.csproj index 9f7da1522..6e1a78eb8 100644 --- a/Integration/Shared/Shared.csproj +++ b/Integration/Shared/Shared.csproj @@ -11,7 +11,7 @@ - - + + diff --git a/Integration/Tests/Services/Services.csproj b/Integration/Tests/Services/Services.csproj index 3aee00da3..557e7a057 100644 --- a/Integration/Tests/Services/Services.csproj +++ b/Integration/Tests/Services/Services.csproj @@ -11,9 +11,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Runtime.sln b/Runtime.sln index f3cc593cc..92b76f721 100644 --- a/Runtime.sln +++ b/Runtime.sln @@ -115,6 +115,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".meta", ".meta", "{CAF1D781 specs.props = specs.props tests.props = tests.props README.md = README.md + Directory.Packages.props = Directory.Packages.props EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{89A4E435-785F-4CDF-91B4-A14EAB91084B}" diff --git a/Source/Actors/Actors.csproj b/Source/Actors/Actors.csproj index 9ede44d27..b6e734f57 100644 --- a/Source/Actors/Actors.csproj +++ b/Source/Actors/Actors.csproj @@ -6,16 +6,15 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - + + + + + diff --git a/Source/CLI/CLI.csproj b/Source/CLI/CLI.csproj index e339451f6..0d14ab5cb 100644 --- a/Source/CLI/CLI.csproj +++ b/Source/CLI/CLI.csproj @@ -21,13 +21,13 @@ - - - - - - - + + + + + + + diff --git a/Source/Client/Client.csproj b/Source/Client/Client.csproj index d1b80e989..f8e6269ac 100644 --- a/Source/Client/Client.csproj +++ b/Source/Client/Client.csproj @@ -7,8 +7,8 @@ - - + + diff --git a/Source/Configuration.Management/Configuration.Management.csproj b/Source/Configuration.Management/Configuration.Management.csproj index 22ea88a7a..9444dd4c8 100644 --- a/Source/Configuration.Management/Configuration.Management.csproj +++ b/Source/Configuration.Management/Configuration.Management.csproj @@ -12,12 +12,12 @@ - - - - - - - + + + + + + + diff --git a/Source/Configuration/Configuration.csproj b/Source/Configuration/Configuration.csproj index bad1d552b..79343174c 100644 --- a/Source/Configuration/Configuration.csproj +++ b/Source/Configuration/Configuration.csproj @@ -12,13 +12,13 @@ - - - - - - - - + + + + + + + + diff --git a/Source/DependencyInversion/DependencyInversion.csproj b/Source/DependencyInversion/DependencyInversion.csproj index cdd76aeeb..409b7fb3f 100755 --- a/Source/DependencyInversion/DependencyInversion.csproj +++ b/Source/DependencyInversion/DependencyInversion.csproj @@ -11,11 +11,11 @@ - - - - - + + + + + diff --git a/Source/Diagnostics/Diagnostics.csproj b/Source/Diagnostics/Diagnostics.csproj index c03480faa..e3d1346d0 100644 --- a/Source/Diagnostics/Diagnostics.csproj +++ b/Source/Diagnostics/Diagnostics.csproj @@ -11,16 +11,16 @@ - - - - - - - - - - + + + + + + + + + + diff --git a/Source/EventHorizon/EventHorizon.csproj b/Source/EventHorizon/EventHorizon.csproj index 41f32c06e..e83ea01f8 100644 --- a/Source/EventHorizon/EventHorizon.csproj +++ b/Source/EventHorizon/EventHorizon.csproj @@ -7,8 +7,8 @@ - - + + diff --git a/Source/Events.Store.MongoDB/Events.Store.MongoDB.csproj b/Source/Events.Store.MongoDB/Events.Store.MongoDB.csproj index f0b5db22f..4a8bc9111 100644 --- a/Source/Events.Store.MongoDB/Events.Store.MongoDB.csproj +++ b/Source/Events.Store.MongoDB/Events.Store.MongoDB.csproj @@ -7,9 +7,9 @@ - - - + + + diff --git a/Source/Events.Store.Services.Grpc/Events.Store.Services.Grpc.csproj b/Source/Events.Store.Services.Grpc/Events.Store.Services.Grpc.csproj index 4ee766526..e7d032ea2 100644 --- a/Source/Events.Store.Services.Grpc/Events.Store.Services.Grpc.csproj +++ b/Source/Events.Store.Services.Grpc/Events.Store.Services.Grpc.csproj @@ -7,7 +7,7 @@ - + diff --git a/Source/Events/Events.csproj b/Source/Events/Events.csproj index 05020248c..65a9363ea 100644 --- a/Source/Events/Events.csproj +++ b/Source/Events/Events.csproj @@ -8,17 +8,17 @@ - - - - - - + + + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + diff --git a/Source/Execution/Execution.csproj b/Source/Execution/Execution.csproj index e603a7d8a..570dd79ca 100755 --- a/Source/Execution/Execution.csproj +++ b/Source/Execution/Execution.csproj @@ -13,7 +13,7 @@ - - + + diff --git a/Source/Hosting/Hosting.csproj b/Source/Hosting/Hosting.csproj index 03c6218fe..b6900fdfa 100755 --- a/Source/Hosting/Hosting.csproj +++ b/Source/Hosting/Hosting.csproj @@ -12,6 +12,6 @@ - + diff --git a/Source/Metrics/Metrics.csproj b/Source/Metrics/Metrics.csproj index 2400beefd..cea624df2 100644 --- a/Source/Metrics/Metrics.csproj +++ b/Source/Metrics/Metrics.csproj @@ -13,10 +13,10 @@ - - - - + + + + diff --git a/Source/MongoDB/MongoDB.csproj b/Source/MongoDB/MongoDB.csproj index 166c4e27f..4964a63d5 100644 --- a/Source/MongoDB/MongoDB.csproj +++ b/Source/MongoDB/MongoDB.csproj @@ -7,7 +7,7 @@ - - + + diff --git a/Source/Protobuf/Protobuf.csproj b/Source/Protobuf/Protobuf.csproj index de02cf391..d5dee17dd 100644 --- a/Source/Protobuf/Protobuf.csproj +++ b/Source/Protobuf/Protobuf.csproj @@ -13,7 +13,7 @@ - - + + diff --git a/Source/Resources/Resources.csproj b/Source/Resources/Resources.csproj index e525032f7..3661176a2 100644 --- a/Source/Resources/Resources.csproj +++ b/Source/Resources/Resources.csproj @@ -12,7 +12,7 @@ - - + + diff --git a/Source/Rudimentary/Rudimentary.csproj b/Source/Rudimentary/Rudimentary.csproj index c221dda3a..67185c5e6 100644 --- a/Source/Rudimentary/Rudimentary.csproj +++ b/Source/Rudimentary/Rudimentary.csproj @@ -6,7 +6,7 @@ - - + + diff --git a/Source/Server/Server.csproj b/Source/Server/Server.csproj index 60bdd7429..9f89e8dc5 100644 --- a/Source/Server/Server.csproj +++ b/Source/Server/Server.csproj @@ -13,15 +13,14 @@ - - - - - - - - - + + + + + + + + diff --git a/Source/Services.Clients/Services.Clients.csproj b/Source/Services.Clients/Services.Clients.csproj index 3bb2ef18e..76dfe2654 100644 --- a/Source/Services.Clients/Services.Clients.csproj +++ b/Source/Services.Clients/Services.Clients.csproj @@ -13,8 +13,8 @@ - - - + + + diff --git a/Source/Services/Services.csproj b/Source/Services/Services.csproj index 5b4ed3985..9f5b45b12 100644 --- a/Source/Services/Services.csproj +++ b/Source/Services/Services.csproj @@ -16,13 +16,13 @@ - - - - - - - - + + + + + + + + diff --git a/Source/Tenancy/Tenancy.csproj b/Source/Tenancy/Tenancy.csproj index d3118b80d..df093f335 100644 --- a/Source/Tenancy/Tenancy.csproj +++ b/Source/Tenancy/Tenancy.csproj @@ -13,7 +13,7 @@ - - + + diff --git a/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj b/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj index 35dde849f..795b5beb4 100644 --- a/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj +++ b/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj @@ -7,7 +7,7 @@ - + diff --git a/Specifications/Events.Store/Events.Store.csproj b/Specifications/Events.Store/Events.Store.csproj index ad4119584..c16213a9d 100755 --- a/Specifications/Events.Store/Events.Store.csproj +++ b/Specifications/Events.Store/Events.Store.csproj @@ -8,7 +8,7 @@ - + diff --git a/Specifications/MongoDB/MongoDB.csproj b/Specifications/MongoDB/MongoDB.csproj index 99e126f3f..b4e244f9f 100755 --- a/Specifications/MongoDB/MongoDB.csproj +++ b/Specifications/MongoDB/MongoDB.csproj @@ -11,7 +11,7 @@ - + diff --git a/Specifications/Services.Clients/Services.Clients.csproj b/Specifications/Services.Clients/Services.Clients.csproj index 767a32649..67ec718fc 100755 --- a/Specifications/Services.Clients/Services.Clients.csproj +++ b/Specifications/Services.Clients/Services.Clients.csproj @@ -10,6 +10,6 @@ - + diff --git a/Specifications/Services/Services.csproj b/Specifications/Services/Services.csproj index 137585dd1..81c167542 100755 --- a/Specifications/Services/Services.csproj +++ b/Specifications/Services/Services.csproj @@ -11,7 +11,7 @@ - + diff --git a/specs.props b/specs.props index 17e36bd64..72addb79f 100644 --- a/specs.props +++ b/specs.props @@ -1,22 +1,23 @@ + - net7.0;net8.0 + net8.0 true IDE1006;IDE0044;IDE0051;IDE0052;CA2211;CS0612;CS0169;CS8981;RCS1169;RCS1018;RCS1213 - - - - - - - - - + + + + + + + + + $(NuGetPackageRoot)dolittle.contracts/$(ContractsVersion)/protos/ diff --git a/tests.props b/tests.props index f982dea3f..cb1c2a9d3 100644 --- a/tests.props +++ b/tests.props @@ -16,12 +16,12 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/versions.props b/versions.props index 0e3d8454b..ae92daff4 100644 --- a/versions.props +++ b/versions.props @@ -1,34 +1,5 @@ - 8.0.0 - 9.0.0 - 2.6.1 7.8.0 - 6.0.2 - 2.* - 2.65.0 - 2.46.6 - 2.65.0 - 2.10.2 - 1.1.2 - 17.10.0 - 8.0.0 - 2.28.0 - 1.5.0 - 4.18.* - 6.12.0 - 2.2.0 - 13.0.3 - 5.1.2 - 8.4.1 - 8.2.1 - 4.4.0 - 1.6.0 - 3.27.3 - 6.7.1 - 6.0.1 - 4.3.0 - 2.9.0 - 2.8.2 From 09c8f7c40b41eb8c7997f82abad816342931356c Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Mon, 16 Sep 2024 14:10:18 +0200 Subject: [PATCH 2/8] Version bumps --- Directory.Packages.props | 150 +++++++++++++++++++-------------------- 1 file changed, 74 insertions(+), 76 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 68aef62d6..e123b3b0c 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,77 +1,75 @@ - - - - - true - - - - 7.8.0 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + true + + + 7.8.0 + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From 092d81526a2f2ecb13603d919cb161c5e1a5d1ac Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Mon, 16 Sep 2024 14:13:32 +0200 Subject: [PATCH 3/8] Do not consider sparse event logs an error --- Source/Events.Store.MongoDB/CommittedEventsFetcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs index 434fb3880..c6964c93f 100644 --- a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs +++ b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs @@ -88,7 +88,7 @@ public async Task FetchNextSequenceNumber(ScopeId scope, if (nextSequenceNumber != eventCount) { - _logger.LogError("Last event sequence number was {LastEventSequenceNumber}, but event count was {EventCount}", lastEvent.EventLogSequenceNumber, + _logger.LogInformation("Sparse event log: Last event sequence number was {LastEventSequenceNumber}, but event count was {EventCount}", lastEvent.EventLogSequenceNumber, eventCount); } From e4d28d693b512b5dc27e12596a0634a35824ffbf Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Mon, 16 Sep 2024 14:29:35 +0200 Subject: [PATCH 4/8] Optimize committer --- Source/Events.Store.MongoDB/CommittedEventsFetcher.cs | 2 +- Source/Events.Store.MongoDB/Persistence/CommitWriter.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs index c6964c93f..bd974d50e 100644 --- a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs +++ b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs @@ -69,7 +69,7 @@ public async Task FetchNextSequenceNumber(ScopeId scope, cancellationToken: cancellationToken) .ConfigureAwait(false); - if (eventCount == 0) return 0ul; // No events means no need to double check + if (eventCount == 0) return 0ul; // No events means no need to double-check var lastEvent = await eventLog .Find(_eventFilter.Empty) diff --git a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs index 17de0a4ea..c714988cb 100644 --- a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs +++ b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs @@ -58,7 +58,7 @@ await _streams.DefaultEventLog.InsertManyAsync( await _aggregateVersions.UpdateAggregateVersions(session, commit, cancellationToken).ConfigureAwait(false); await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false); //TODO: Notifying for events should be a concern handled by actors - _streamWatcher.NotifyForEvent(ScopeId.Default, StreamId.EventLog, eventsToStore.Max(_ => _.EventLogSequenceNumber)); + _streamWatcher.NotifyForEvent(ScopeId.Default, StreamId.EventLog, commit.LastSequenceNumber.Value); return Try.Succeeded; } catch (MongoWaitQueueFullException ex) From c9e93681b5111ad9c73d7bfa26134a8d7db55c22 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 21 Aug 2024 12:28:21 +0200 Subject: [PATCH 5/8] Removed dependency on Nito.AsyncEx, replaced with channels. Added bounds to EventHorizon, limiting work in progress if consumer is slower than producer. --- .../Consumer/Connections/EventHorizonConnection.cs | 8 ++++---- .../Connections/IEventHorizonConnection.cs | 6 +++--- .../Processing/EventsFromEventHorizonFetcher.cs | 10 +++++----- Source/EventHorizon/Consumer/Subscription.cs | 14 ++++++-------- Source/EventHorizon/EventHorizon.csproj | 1 - Source/Events/Events.csproj | 1 - .../and_2_requests_are_handled.cs | 4 ++-- .../and_cancellation_is_requested.cs | 8 +++++++- .../given/all_dependencies.cs | 8 ++++---- .../for_StreamProcessor/given/all_dependencies.cs | 6 +++--- .../for_Subscription/given/all_dependencies.cs | 6 +++--- .../and_it_has_already_started.cs | 4 ++-- .../and_connection_completes.cs | 4 ++-- .../and_connection_response_changes_once.cs | 4 ++-- .../and_everything_works.cs | 4 ++-- .../and_it_has_already_started.cs | 4 ++-- .../and_receiving_events_fails.cs | 6 +++--- .../and_stream_processor_completes.cs | 4 ++-- .../and_stream_processor_fails.cs | 4 ++-- 19 files changed, 54 insertions(+), 52 deletions(-) diff --git a/Source/EventHorizon/Consumer/Connections/EventHorizonConnection.cs b/Source/EventHorizon/Consumer/Connections/EventHorizonConnection.cs index a44c1a045..bbf047d44 100644 --- a/Source/EventHorizon/Consumer/Connections/EventHorizonConnection.cs +++ b/Source/EventHorizon/Consumer/Connections/EventHorizonConnection.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Dolittle.Runtime.EventHorizon.Contracts; using Dolittle.Runtime.EventHorizon.UnBreaking; @@ -11,7 +12,6 @@ using Dolittle.Runtime.Protobuf; using Dolittle.Runtime.Services.Clients; using Microsoft.Extensions.Logging; -using Nito.AsyncEx; using ExecutionContext = Dolittle.Runtime.Execution.ExecutionContext; namespace Dolittle.Runtime.EventHorizon.Consumer.Connections; @@ -92,7 +92,7 @@ public async Task Connect( /// public Task StartReceivingEventsInto( - AsyncProducerConsumerQueue connectionToStreamProcessorQueue, + Channel connectionToStreamProcessorQueue, CancellationToken cancellationToken) { return _reverseCallClient.Handle( @@ -105,7 +105,7 @@ public Task StartReceivingEventsInto( } async Task HandleEventFromEventHorizon( - AsyncProducerConsumerQueue connectionToStreamProcessorQueue, + Channel connectionToStreamProcessorQueue, EventHorizonEvent @event, CancellationToken cancellationToken) { @@ -120,7 +120,7 @@ async Task HandleEventFromEventHorizon( PartitionId.None, false); - await connectionToStreamProcessorQueue.EnqueueAsync(streamEvent, cancellationToken).ConfigureAwait(false); + await connectionToStreamProcessorQueue.Writer.WriteAsync(streamEvent, cancellationToken).ConfigureAwait(false); return CreateSuccessfulResponse(); } catch (Exception exception) diff --git a/Source/EventHorizon/Consumer/Connections/IEventHorizonConnection.cs b/Source/EventHorizon/Consumer/Connections/IEventHorizonConnection.cs index 238a4d2ee..9ee60a53e 100644 --- a/Source/EventHorizon/Consumer/Connections/IEventHorizonConnection.cs +++ b/Source/EventHorizon/Consumer/Connections/IEventHorizonConnection.cs @@ -3,9 +3,9 @@ using System; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Dolittle.Runtime.Events.Store.Streams; -using Nito.AsyncEx; namespace Dolittle.Runtime.EventHorizon.Consumer.Connections; @@ -22,7 +22,7 @@ public interface IEventHorizonConnection : IDisposable /// A cancellation token that can be used to cancel the connection attempt. /// /// A task that, when resolved returns the from the connection to the producer Runtime. - /// If is true, the connection is started and should be called. + /// If is true, the connection is started and should be called. /// Else, the connection failed and should it should not be used. /// Task Connect( @@ -37,6 +37,6 @@ Task Connect( /// A cancellation token that can be used to close the connection. /// A task that represents the asynchronous operation. Task StartReceivingEventsInto( - AsyncProducerConsumerQueue connectionToStreamProcessorQueue, + Channel connectionToStreamProcessorQueue, CancellationToken cancellationToken); } diff --git a/Source/EventHorizon/Consumer/Processing/EventsFromEventHorizonFetcher.cs b/Source/EventHorizon/Consumer/Processing/EventsFromEventHorizonFetcher.cs index 197485abf..e2177fab6 100644 --- a/Source/EventHorizon/Consumer/Processing/EventsFromEventHorizonFetcher.cs +++ b/Source/EventHorizon/Consumer/Processing/EventsFromEventHorizonFetcher.cs @@ -4,11 +4,11 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Dolittle.Runtime.Rudimentary; using Dolittle.Runtime.Events.Store; using Dolittle.Runtime.Events.Store.Streams; -using Nito.AsyncEx; namespace Dolittle.Runtime.EventHorizon.Consumer.Processing; @@ -17,15 +17,15 @@ namespace Dolittle.Runtime.EventHorizon.Consumer.Processing; /// public class EventsFromEventHorizonFetcher : ICanFetchEventsFromStream, IStreamEventWatcher { - readonly AsyncProducerConsumerQueue _events; + readonly Channel _events; readonly IMetricsCollector _metrics; /// /// Initializes a new instance of the class. /// - /// The . + /// The . /// The system for collecting metrics. - public EventsFromEventHorizonFetcher(AsyncProducerConsumerQueue events, IMetricsCollector metrics) + public EventsFromEventHorizonFetcher(Channel events, IMetricsCollector metrics) { _events = events; _metrics = metrics; @@ -37,7 +37,7 @@ public async Task>> Fetch(StreamPosition position, try { //TODO: This can be improved by taking as many as possible instead of just the first - var @event = await _events.DequeueAsync(cancellationToken).ConfigureAwait(false); + var @event = await _events.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); _metrics.IncrementTotalEventsFetched(); return new[] { @event }; } diff --git a/Source/EventHorizon/Consumer/Subscription.cs b/Source/EventHorizon/Consumer/Subscription.cs index 93df81415..034c443ee 100644 --- a/Source/EventHorizon/Consumer/Subscription.cs +++ b/Source/EventHorizon/Consumer/Subscription.cs @@ -3,6 +3,7 @@ using System; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Dolittle.Runtime.EventHorizon.Consumer.Connections; using Dolittle.Runtime.EventHorizon.Consumer.Processing; @@ -12,7 +13,6 @@ using Dolittle.Runtime.Rudimentary; using Microservices; using Microsoft.Extensions.Logging; -using Nito.AsyncEx; using ExecutionContext = Dolittle.Runtime.Execution.ExecutionContext; namespace Dolittle.Runtime.EventHorizon.Consumer; @@ -46,7 +46,7 @@ public class Subscription : ISubscription /// The factory to use for creating stream processors that write the received events. /// The system to use for getting the next event to recieve for a subscription. /// The system for collecting metrics. - /// Thh . + /// The . /// The system for logging messages. public Subscription( SubscriptionId identifier, @@ -182,14 +182,12 @@ async Task ReceiveAndWriteEvents(IEventHorizonConnection connection, ConsentId c { _logger.SubscriptionIsReceivingAndWriting(Identifier, consent); using var processingCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - var connectionToStreamProcessorQueue = new AsyncProducerConsumerQueue(); + var connectionToStreamProcessorQueue = Channel.CreateBounded(1000); var writeEventsStreamProcessor = _streamProcessorFactory.Create(consent, Identifier, _executionContext, new EventsFromEventHorizonFetcher(connectionToStreamProcessorQueue, _processingMetrics)); var tasks = new TaskGroup( - writeEventsStreamProcessor.StartAndWait( - processingCancellationToken.Token), - connection.StartReceivingEventsInto( - connectionToStreamProcessorQueue, - processingCancellationToken.Token)); + writeEventsStreamProcessor.StartAndWait(processingCancellationToken.Token), + connection.StartReceivingEventsInto(connectionToStreamProcessorQueue, processingCancellationToken.Token) + ); State = SubscriptionState.Connected; await tasks.WaitForAllCancellingOnFirst(processingCancellationToken).ConfigureAwait(false); diff --git a/Source/EventHorizon/EventHorizon.csproj b/Source/EventHorizon/EventHorizon.csproj index e83ea01f8..a8b18bcf5 100644 --- a/Source/EventHorizon/EventHorizon.csproj +++ b/Source/EventHorizon/EventHorizon.csproj @@ -8,7 +8,6 @@ - diff --git a/Source/Events/Events.csproj b/Source/Events/Events.csproj index 65a9363ea..798cab868 100644 --- a/Source/Events/Events.csproj +++ b/Source/Events/Events.csproj @@ -17,7 +17,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_2_requests_are_handled.cs b/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_2_requests_are_handled.cs index 13c69cb00..30ba73acd 100644 --- a/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_2_requests_are_handled.cs +++ b/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_2_requests_are_handled.cs @@ -28,6 +28,6 @@ public class and_2_requests_are_handled : given.all_dependencies }; It should_be_completed = () => result.IsCompleted.ShouldBeTrue(); - It should_not_have_put_anything_in_event_queue = () => event_queue.OutputAvailable().ShouldBeTrue(); - It should_have_2_events_in_queue = () => event_queue.GetConsumingEnumerable().Count().ShouldEqual(2); + It should_not_have_put_anything_in_event_queue = () => event_queue.Reader.TryPeek(out _).ShouldBeTrue(); + It should_have_2_events_in_queue = () => event_queue.Reader.ReadAllAsync().ToListAsync().GetAwaiter().GetResult().Count.ShouldEqual(2); } \ No newline at end of file diff --git a/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_cancellation_is_requested.cs b/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_cancellation_is_requested.cs index 1ea63d505..1159a9927 100644 --- a/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_cancellation_is_requested.cs +++ b/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/and_cancellation_is_requested.cs @@ -22,5 +22,11 @@ public class and_cancellation_is_requested : given.all_dependencies }; It should_be_completed = () => result.IsCompleted.ShouldBeTrue(); - It should_not_have_put_anything_in_event_queue = () => event_queue.OutputAvailable().ShouldBeFalse(); + It should_not_have_put_anything_in_event_queue = () => + { + var wasRead = event_queue.Reader.TryPeek(out var evt); + evt.ShouldBeNull(); + wasRead.ShouldBeFalse(); + + }; } \ No newline at end of file diff --git a/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/given/all_dependencies.cs b/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/given/all_dependencies.cs index 1f0f4250c..a65dc3a99 100644 --- a/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/given/all_dependencies.cs +++ b/Specifications/EventHorizon/Consumer/Connections/for_EventHorizonConnection/when_starting_receiving_events_into/given/all_dependencies.cs @@ -7,8 +7,8 @@ using Machine.Specifications; using System.Threading; +using System.Threading.Channels; using Dolittle.Runtime.Events.Store.Streams; -using Nito.AsyncEx; using Dolittle.Runtime.Services.Clients; using Dolittle.Runtime.EventHorizon.Contracts; using System.Threading.Tasks; @@ -22,13 +22,13 @@ namespace Dolittle.Runtime.EventHorizon.Consumer.Connections.for_EventHorizonCon public class all_dependencies : for_EventHorizonConnection.given.all_dependencies { - protected static AsyncProducerConsumerQueue event_queue; + protected static Channel event_queue; protected static CancellationTokenSource cts; Establish context = () => { cts = new CancellationTokenSource(); - event_queue = new AsyncProducerConsumerQueue(); + event_queue = Channel.CreateBounded(1000); }; protected static void SetupReverseCallClient(params ConsumerRequest[] requests) @@ -45,7 +45,7 @@ protected static void SetupReverseCallClient(params ConsumerRequest[] requests) await Task.Delay(10).ConfigureAwait(false); i++; } - event_queue.CompleteAdding(); + event_queue.Writer.Complete(); })); protected static ConsumerRequest CreateRequest() { diff --git a/Specifications/EventHorizon/Consumer/Processing/for_StreamProcessor/given/all_dependencies.cs b/Specifications/EventHorizon/Consumer/Processing/for_StreamProcessor/given/all_dependencies.cs index 15576cdd1..98cda0022 100644 --- a/Specifications/EventHorizon/Consumer/Processing/for_StreamProcessor/given/all_dependencies.cs +++ b/Specifications/EventHorizon/Consumer/Processing/for_StreamProcessor/given/all_dependencies.cs @@ -8,7 +8,7 @@ using System.Threading; using Dolittle.Runtime.Events.Processing; using Dolittle.Runtime.Events.Store.Streams; -using Nito.AsyncEx; +using System.Threading.Channels; using Microsoft.Extensions.Logging.Abstractions; using ExecutionContext = Dolittle.Runtime.Execution.ExecutionContext; @@ -20,7 +20,7 @@ public class all_dependencies protected static CancellationToken cancellation_token; protected static Mock event_processor; protected static EventsFromEventHorizonFetcher event_fetcher; - protected static AsyncProducerConsumerQueue event_queue; + protected static Channel event_queue; protected static Mock stream_processor_states; protected static StreamProcessor stream_processor; protected static ExecutionContext execution_context; @@ -38,7 +38,7 @@ public class all_dependencies ); cancellation_token = CancellationToken.None; event_processor = new Mock(); - event_queue = new AsyncProducerConsumerQueue(); + event_queue = Channel.CreateBounded(1000); event_fetcher = new EventsFromEventHorizonFetcher(event_queue, Mock.Of()); stream_processor_states = new Mock(); stream_processor = new StreamProcessor( diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/given/all_dependencies.cs b/Specifications/EventHorizon/Consumer/for_Subscription/given/all_dependencies.cs index 2b3220967..2ea3665a9 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/given/all_dependencies.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/given/all_dependencies.cs @@ -12,7 +12,7 @@ using Dolittle.Runtime.Events.Store.Streams; using Dolittle.Runtime.Events.Store.EventHorizon; using Microservices; -using Nito.AsyncEx; +using System.Threading.Channels; using Polly; using ExecutionContext = Dolittle.Runtime.Execution.ExecutionContext; @@ -132,8 +132,8 @@ static void EstablishInitialMockSetups() } })); event_horizon_connection - .Setup(_ => _.StartReceivingEventsInto(Moq.It.IsAny>(), Moq.It.IsAny())) - .Returns, CancellationToken>((_, cancellationToken) => Task.Run(async () => + .Setup(_ => _.StartReceivingEventsInto(Moq.It.IsAny>(), Moq.It.IsAny())) + .Returns, CancellationToken>((_, cancellationToken) => Task.Run(async () => { while (!event_horizon_connection_cts.Token.IsCancellationRequested && !cancellationToken.IsCancellationRequested) { diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_fails/and_it_has_already_started.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_fails/and_it_has_already_started.cs index 26fead6d0..a4ddee78f 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_fails/and_it_has_already_started.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_fails/and_it_has_already_started.cs @@ -10,7 +10,7 @@ using Dolittle.Runtime.Protobuf; using Machine.Specifications; using Microservices; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_fails; @@ -49,7 +49,7 @@ public class and_it_has_already_started : given.all_dependencies Moq.It.IsAny()), Moq.Times.Never); It should_not_start_receiving_events = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.Never); It should_not_be_connected = () => subscription.State.ShouldNotEqual(SubscriptionState.Connected); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_completes.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_completes.cs index 3d516d8d9..a0207359d 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_completes.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_completes.cs @@ -6,7 +6,7 @@ using Dolittle.Runtime.EventHorizon.Consumer.Processing; using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -38,7 +38,7 @@ public class and_connection_completes : given.all_dependencies Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_start_receiving_events_at_least_twice = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_get_the_successfull_response = () => subscription.ConnectionResponse.Result.Success.ShouldBeTrue(); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_response_changes_once.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_response_changes_once.cs index 0718b32fd..b195b29ff 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_response_changes_once.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_connection_response_changes_once.cs @@ -8,7 +8,7 @@ using Dolittle.Runtime.Events.Store.EventHorizon; using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -80,7 +80,7 @@ public class and_connection_response_changes_once : given.all_dependencies Moq.It.IsAny()), Moq.Times.Exactly(2)); It should_start_receiving_events_twice = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.Exactly(2)); It should_be_connected = () => subscription.State.ShouldEqual(SubscriptionState.Connected); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_everything_works.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_everything_works.cs index 9d6fbaaf0..84e138dbe 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_everything_works.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_everything_works.cs @@ -6,7 +6,7 @@ using Dolittle.Runtime.EventHorizon.Consumer.Processing; using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -34,7 +34,7 @@ public class and_everything_works : given.all_dependencies Moq.It.IsAny()), Moq.Times.Once); It should_start_receiving_events_once = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.Once); It should_be_connected = () => subscription.State.ShouldEqual(SubscriptionState.Connected); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_it_has_already_started.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_it_has_already_started.cs index 8816c0169..fdc6c8109 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_it_has_already_started.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_it_has_already_started.cs @@ -8,7 +8,7 @@ using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; using Microservices; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -42,7 +42,7 @@ public class and_it_has_already_started : given.all_dependencies Moq.It.IsAny()), Moq.Times.Once); It should_start_receiving_events_once = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.Once); It should_be_connected = () => subscription.State.ShouldEqual(SubscriptionState.Connected); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_receiving_events_fails.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_receiving_events_fails.cs index b171f5c24..fd445f6ed 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_receiving_events_fails.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_receiving_events_fails.cs @@ -7,7 +7,7 @@ using Dolittle.Runtime.EventHorizon.Consumer.Processing; using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -16,7 +16,7 @@ public class and_receiving_events_fails : given.all_dependencies Establish context = () => { event_horizon_connection - .Setup(_ => _.StartReceivingEventsInto(Moq.It.IsAny>(), Moq.It.IsAny())) + .Setup(_ => _.StartReceivingEventsInto(Moq.It.IsAny>(), Moq.It.IsAny())) .Returns(Task.Run(() => throw new Exception())); }; Because of = () => @@ -41,7 +41,7 @@ public class and_receiving_events_fails : given.all_dependencies Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_start_receiving_events_at_least_twice = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_get_the_successfull_response = () => subscription.ConnectionResponse.Result.Success.ShouldBeTrue(); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_completes.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_completes.cs index 3f1d43aa9..39628285a 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_completes.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_completes.cs @@ -6,7 +6,7 @@ using Dolittle.Runtime.EventHorizon.Consumer.Processing; using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -38,7 +38,7 @@ public class and_stream_processor_completes : given.all_dependencies Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_start_receiving_events_at_least_twice = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_get_the_successfull_response = () => subscription.ConnectionResponse.Result.Success.ShouldBeTrue(); diff --git a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_fails.cs b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_fails.cs index f58ee7050..0f67c1f08 100644 --- a/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_fails.cs +++ b/Specifications/EventHorizon/Consumer/for_Subscription/when_starting/and_connection_succeeds/and_stream_processor_fails.cs @@ -7,7 +7,7 @@ using Dolittle.Runtime.EventHorizon.Consumer.Processing; using Dolittle.Runtime.Events.Store.Streams; using Machine.Specifications; -using Nito.AsyncEx; +using System.Threading.Channels; namespace Dolittle.Runtime.EventHorizon.Consumer.for_Subscription.when_starting.and_connection_succeeds; @@ -41,7 +41,7 @@ public class and_stream_processor_fails : given.all_dependencies Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_start_receiving_events_at_least_twice = () => event_horizon_connection.Verify(_ => _.StartReceivingEventsInto( - Moq.It.IsAny>(), + Moq.It.IsAny>(), Moq.It.IsAny()), Moq.Times.AtLeast(2)); It should_get_the_successfull_response = () => subscription.ConnectionResponse.Result.Success.ShouldBeTrue(); From 0b1d59ffa5839440e4f10d8225c0e2f86e21789a Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Tue, 17 Sep 2024 12:08:02 +0200 Subject: [PATCH 6/8] Added support for sparse event logs, allowing the runtime to continue operating even with truncated / scavenged old events. Useful when clearing out events that are no longer needed. --- Directory.Packages.props | 1 + .../CommittedEventsFetcher.cs | 17 +++-- .../Events/EventLogMetadata.cs | 19 +++++ .../Persistence/CommitWriter.cs | 22 +++++- .../Persistence/EventLogOffsetStore.cs | 60 +++++++++++++++ .../Persistence/IEventLogOffsetStore.cs | 33 ++++++++ .../Events.Store.MongoDB/Streams/IStreams.cs | 4 +- .../Events.Store.MongoDB/Streams/Streams.cs | 2 + Source/Events/Store/ScopeId.cs | 4 +- .../EventStoreOffsetTests.cs | 75 +++++++++++++++++++ .../Events.Store.MongoDB.Tests.csproj | 1 + .../MongoDatabaseFixture.cs | 44 +++++++++++ 12 files changed, 272 insertions(+), 10 deletions(-) create mode 100644 Source/Events.Store.MongoDB/Events/EventLogMetadata.cs create mode 100644 Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs create mode 100644 Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs create mode 100644 Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs create mode 100644 Specifications/Events.Store.MongoDB.Tests/MongoDatabaseFixture.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index e123b3b0c..5ab968212 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -69,6 +69,7 @@ + diff --git a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs index bd974d50e..116a4625e 100644 --- a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs +++ b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs @@ -12,6 +12,7 @@ using Dolittle.Runtime.DependencyInversion.Scoping; using Dolittle.Runtime.Events.Store.MongoDB.Events; using Dolittle.Runtime.Events.Store.MongoDB.Legacy; +using Dolittle.Runtime.Events.Store.MongoDB.Persistence; using Dolittle.Runtime.Events.Store.MongoDB.Projections; using Dolittle.Runtime.Events.Store.MongoDB.Streams; using Dolittle.Runtime.MongoDB; @@ -37,6 +38,7 @@ public class CommittedEventsFetcher : IFetchCommittedEvents readonly IAggregateRoots _aggregateRoots; readonly ILogger _logger; readonly IEventContentConverter _contentConverter; + readonly IEventLogOffsetStore _eventLogOffsetStore; /// /// Initializes a new instance of the class. @@ -45,12 +47,14 @@ public class CommittedEventsFetcher : IFetchCommittedEvents /// The . /// The . /// The . + /// The . /// public CommittedEventsFetcher( IStreams streams, IEventConverter eventConverter, IAggregateRoots aggregateRoots, IEventContentConverter contentConverter, + IEventLogOffsetStore eventLogOffsetStore, ILogger logger) { _streams = streams; @@ -58,18 +62,21 @@ public CommittedEventsFetcher( _aggregateRoots = aggregateRoots; _contentConverter = contentConverter; _logger = logger; + _eventLogOffsetStore = eventLogOffsetStore; } /// public async Task FetchNextSequenceNumber(ScopeId scope, CancellationToken cancellationToken) { + var storedEventOffset = await _eventLogOffsetStore.GetNextOffset(scope, cancellationToken); + var eventLog = await GetEventLog(scope, cancellationToken).ConfigureAwait(false); var eventCount = (ulong)await eventLog.CountDocumentsAsync( _eventFilter.Empty, cancellationToken: cancellationToken) .ConfigureAwait(false); - if (eventCount == 0) return 0ul; // No events means no need to double-check + if (storedEventOffset == 0 && eventCount == 0) return 0ul; // No events means no need to double-check var lastEvent = await eventLog .Find(_eventFilter.Empty) @@ -80,11 +87,11 @@ public async Task FetchNextSequenceNumber(ScopeId scope, if (lastEvent is null) { // Should not be possible - _logger.LogError("No last event found, but event count was {EventCount}", eventCount); - return 0ul; + _logger.LogError("No last event found"); + return storedEventOffset; } - - var nextSequenceNumber = lastEvent.EventLogSequenceNumber + 1; + + var nextSequenceNumber = Math.Max(lastEvent.EventLogSequenceNumber + 1, storedEventOffset); if (nextSequenceNumber != eventCount) { diff --git a/Source/Events.Store.MongoDB/Events/EventLogMetadata.cs b/Source/Events.Store.MongoDB/Events/EventLogMetadata.cs new file mode 100644 index 000000000..87480cba8 --- /dev/null +++ b/Source/Events.Store.MongoDB/Events/EventLogMetadata.cs @@ -0,0 +1,19 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using MongoDB.Bson; +using MongoDB.Bson.Serialization.Attributes; + +namespace Dolittle.Runtime.Events.Store.MongoDB.Events; + +/// +/// Keeps the offset metadata for an event log. +/// +public class EventLogMetadata +{ + [BsonId] public required Guid Scope { get; init; } + + [BsonRepresentation(BsonType.Decimal128)] + public ulong NextEventOffset { get; init; } +} diff --git a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs index c714988cb..4bd0e26f3 100644 --- a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs +++ b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs @@ -24,6 +24,7 @@ public class CommitWriter : IPersistCommits readonly IStreamEventWatcher _streamWatcher; readonly IConvertCommitToEvents _commitConverter; readonly IUpdateAggregateVersionsAfterCommit _aggregateVersions; + readonly IEventLogOffsetStore _eventLogOffsetStore; /// /// Initializes a new instance of the class. @@ -31,12 +32,21 @@ public class CommitWriter : IPersistCommits /// The . /// The . /// The . - public CommitWriter(IStreams streams, IStreamEventWatcher streamWatcher, IConvertCommitToEvents commitConverter, IUpdateAggregateVersionsAfterCommit aggregateVersions) + /// The . + /// The . + public CommitWriter( + IStreams streams, + IStreamEventWatcher streamWatcher, + IConvertCommitToEvents commitConverter, + IUpdateAggregateVersionsAfterCommit aggregateVersions, + IEventLogOffsetStore eventLogOffsetStore + ) { _streams = streams; _streamWatcher = streamWatcher; _commitConverter = commitConverter; _aggregateVersions = aggregateVersions; + _eventLogOffsetStore = eventLogOffsetStore; } /// @@ -47,7 +57,9 @@ public async Task Persist(Commit commit, CancellationToken cancellationToke { return new NoEventsToCommit(); } - using var session = await _streams.StartSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + + using var session = + await _streams.StartSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false); try { session.StartTransaction(); @@ -55,7 +67,12 @@ await _streams.DefaultEventLog.InsertManyAsync( session, eventsToStore, cancellationToken: cancellationToken).ConfigureAwait(false); + + var nextOffset = commit.LastSequenceNumber + 1; + await _eventLogOffsetStore.UpdateOffset(session, ScopeId.Default, nextOffset, cancellationToken) + .ConfigureAwait(false); await _aggregateVersions.UpdateAggregateVersions(session, commit, cancellationToken).ConfigureAwait(false); + await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false); //TODO: Notifying for events should be a concern handled by actors _streamWatcher.NotifyForEvent(ScopeId.Default, StreamId.EventLog, commit.LastSequenceNumber.Value); @@ -71,6 +88,5 @@ await _streams.DefaultEventLog.InsertManyAsync( await session.AbortTransactionAsync(cancellationToken).ConfigureAwait(false); return ex; } - } } diff --git a/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs b/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs new file mode 100644 index 000000000..8b345d653 --- /dev/null +++ b/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs @@ -0,0 +1,60 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; +using System.Threading.Tasks; +using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.DependencyInversion.Scoping; +using Dolittle.Runtime.Events.Store.MongoDB.Events; +using Dolittle.Runtime.Events.Store.Persistence; +using MongoDB.Driver; + +namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; + +[Singleton, PerTenant] +public class EventLogOffsetStore: EventStoreConnection, IEventLogOffsetStore +{ + static readonly FilterDefinitionBuilder _filterBuilder = new(); + static readonly FilterDefinition _defaultScopeFilter = _filterBuilder.Eq(metadata => metadata.Scope, ScopeId.Default.Value); + + + const string EventLogMetadataCollectionName = "event-log-metadata"; + + public IMongoCollection Collection { get; } + + public EventLogOffsetStore(IDatabaseConnection connection):base(connection) + { + Collection = Database.GetCollection(EventLogMetadataCollectionName); + } + + public Task UpdateOffset(IClientSessionHandle session, ScopeId scopeId, ulong nextEventOffset, + CancellationToken cancellationToken) + { + var metadata = new EventLogMetadata + { + Scope = scopeId.Value, + NextEventOffset = nextEventOffset, + }; + + var updateDefinition = new UpdateDefinitionBuilder() + .SetOnInsert(it => it.Scope, metadata.Scope) + .Set(it => it.NextEventOffset, metadata.NextEventOffset); + + return Collection.UpdateOneAsync( + session:session, + filter: GetFilter(scopeId), + updateDefinition, + options: new UpdateOptions + { + IsUpsert = true, + }, cancellationToken); + } + + public Task GetNextOffset(ScopeId scopeId, CancellationToken cancellationToken) + => Collection + .Find(GetFilter(scopeId)) + .Project(metadata => metadata.NextEventOffset) + .FirstOrDefaultAsync(cancellationToken); + + static FilterDefinition GetFilter(ScopeId scopeId) => scopeId.IsDefaultScope ? _defaultScopeFilter : _filterBuilder.Eq(metadata => metadata.Scope, scopeId.Value); +} diff --git a/Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs b/Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs new file mode 100644 index 000000000..70ef4ee96 --- /dev/null +++ b/Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs @@ -0,0 +1,33 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; +using System.Threading.Tasks; +using Dolittle.Runtime.Events.Store.Persistence; +using MongoDB.Driver; + +namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; + +/// +/// Defines a system that can update the high watermark offset for an event log. +/// +public interface IEventLogOffsetStore +{ + /// + /// Updates the next offset for a given event log. + /// + /// The for the MongoDB transaction. + /// The . + /// The next offset to write to this eventlog + /// The . + /// A that, when resolved, returns the result of the operation. + Task UpdateOffset(IClientSessionHandle session, ScopeId scopeId, ulong nextOffset, CancellationToken cancellationToken); + + /// + /// Gets the next offset for a given event log. + /// + /// + /// + /// + public Task GetNextOffset(ScopeId scopeId, CancellationToken cancellationToken); +} diff --git a/Source/Events.Store.MongoDB/Streams/IStreams.cs b/Source/Events.Store.MongoDB/Streams/IStreams.cs index 8e5b51232..0e06b68b8 100644 --- a/Source/Events.Store.MongoDB/Streams/IStreams.cs +++ b/Source/Events.Store.MongoDB/Streams/IStreams.cs @@ -18,6 +18,8 @@ public interface IStreams : IEventStoreConnection /// IMongoCollection DefaultEventLog { get; } + IMongoCollection EventLogMetadata { get; } + /// /// Gets a non-public and non-event-log Stream. /// @@ -50,4 +52,4 @@ public interface IStreams : IEventStoreConnection /// The . /// A that, when resolved, returns a with . Task> GetEventLog(ScopeId scopeId, CancellationToken token); -} \ No newline at end of file +} diff --git a/Source/Events.Store.MongoDB/Streams/Streams.cs b/Source/Events.Store.MongoDB/Streams/Streams.cs index 085d455f8..4bb49a1c3 100644 --- a/Source/Events.Store.MongoDB/Streams/Streams.cs +++ b/Source/Events.Store.MongoDB/Streams/Streams.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Dolittle.Runtime.DependencyInversion.Lifecycle; using Dolittle.Runtime.DependencyInversion.Scoping; +using Dolittle.Runtime.Events.Store.MongoDB.Events; using Dolittle.Runtime.Events.Store.Streams; using Microsoft.Extensions.Logging; using MongoDB.Driver; @@ -42,6 +43,7 @@ public Streams(IDatabaseConnection connection, ILogger logger) /// public IMongoCollection DefaultEventLog { get; } + public IMongoCollection EventLogMetadata { get; } /// public Task> Get(ScopeId scopeId, StreamId streamId, CancellationToken token) diff --git a/Source/Events/Store/ScopeId.cs b/Source/Events/Store/ScopeId.cs index 0afa238c6..65feaccef 100644 --- a/Source/Events/Store/ScopeId.cs +++ b/Source/Events/Store/ScopeId.cs @@ -27,4 +27,6 @@ public record ScopeId(Guid Value) : ConceptAs(Value) /// /// ScopeId as . public static implicit operator ScopeId(string scopeId) => new(Guid.Parse(scopeId)); -} \ No newline at end of file + + public bool IsDefaultScope => Value.Equals(Guid.Empty); +} diff --git a/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs b/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs new file mode 100644 index 000000000..c448d3f17 --- /dev/null +++ b/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs @@ -0,0 +1,75 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Dolittle.Runtime.Events.Store; +using Dolittle.Runtime.Events.Store.MongoDB.Persistence; +using FluentAssertions; +using Xunit; + +namespace Events.Store.MongoDB.Tests; + +public class EventStoreOffsetTests(MongoDatabaseFixture fixture): IClassFixture +{ + [Fact] + public async Task WhenNoOffsetExists() + { + var nextOffset = await Sut.GetNextOffset(new ScopeId(Guid.NewGuid()), CancellationToken.None); + + nextOffset.Should().Be(0); + } + + [Fact] + public async Task WhenStoringDefaultScopeOffset() + { + ulong nextOffset = 9; + await StoreOffset(ScopeId.Default, nextOffset); + + var nextOffsetRetrieved = await Sut.GetNextOffset(ScopeId.Default, CancellationToken.None); + + nextOffsetRetrieved.Should().Be(nextOffset); + } + + [Fact] + public async Task WhenUpdatingOffset() + { + var scope = new ScopeId(Guid.NewGuid()); + ulong nextOffset = 99; + await StoreOffset(scope, nextOffset); + await StoreOffset(scope, ++nextOffset); + + var nextOffsetRetrieved = await Sut.GetNextOffset(scope, CancellationToken.None); + + nextOffsetRetrieved.Should().Be(nextOffset); + } + + [Fact] + public async Task WhenRollingBackOffset() + { + var scope = new ScopeId(Guid.NewGuid()); + ulong nextOffset = 999; + await StoreOffset(scope, nextOffset); + await StoreOffsetButRollBack(scope, nextOffset + 1); + + var nextOffsetRetrieved = await Sut.GetNextOffset(scope, CancellationToken.None); + + nextOffsetRetrieved.Should().Be(nextOffset); + } + + private async Task StoreOffset(ScopeId scope, ulong nextOffset) + { + using var session = await fixture.Connection.MongoClient.StartSessionAsync(); + session.StartTransaction(); + await Sut.UpdateOffset(session, scope, nextOffset, CancellationToken.None); + await session.CommitTransactionAsync(); + } + + private async Task StoreOffsetButRollBack(ScopeId scope, ulong nextOffset) + { + using var session = await fixture.Connection.MongoClient.StartSessionAsync(); + session.StartTransaction(); + await Sut.UpdateOffset(session, scope, nextOffset, CancellationToken.None); + await session.AbortTransactionAsync(); + } + + private EventLogOffsetStore Sut { get; } = new(fixture.Connection); +} \ No newline at end of file diff --git a/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj b/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj index 795b5beb4..f8fa88786 100644 --- a/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj +++ b/Specifications/Events.Store.MongoDB.Tests/Events.Store.MongoDB.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/Specifications/Events.Store.MongoDB.Tests/MongoDatabaseFixture.cs b/Specifications/Events.Store.MongoDB.Tests/MongoDatabaseFixture.cs new file mode 100644 index 000000000..42afdfed0 --- /dev/null +++ b/Specifications/Events.Store.MongoDB.Tests/MongoDatabaseFixture.cs @@ -0,0 +1,44 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Dolittle.Runtime.Events.Store.MongoDB; +using Dolittle.Runtime.Events.Store.MongoDB.Legacy; +using Microsoft.Extensions.Options; +using Testcontainers.MongoDb; +using Xunit; + +namespace Events.Store.MongoDB.Tests; + +public class MongoDatabaseFixture: IAsyncLifetime +{ + IDatabaseConnection? _connection; + + public IDatabaseConnection Connection => _connection ?? throw new Exception("Connection not initialized"); + + MongoDbContainer MongoDbContainer { get; } + + public MongoDatabaseFixture() + { + MongoDbContainer = new MongoDbBuilder() + .WithReplicaSet("rs0") + .Build(); + } + + public async Task InitializeAsync() + { + await MongoDbContainer.StartAsync(); + var connectionString = MongoDbContainer.GetConnectionString(); + var eventStoreConfiguration = new EventStoreConfiguration + { + ConnectionString = connectionString, + Database = "EventStore" + }; + _connection = new DatabaseConnection(Options.Create(eventStoreConfiguration), new BackwardsCompatibility()); + } + + public async Task DisposeAsync() + { + _connection = null; + await MongoDbContainer.DisposeAsync(); + } +} \ No newline at end of file From ab1b4d23bb9bcf9d4c15d5a63ace6cf8affab08d Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Tue, 17 Sep 2024 13:23:55 +0200 Subject: [PATCH 7/8] EventLog offset metadata init --- .../Persistence/EventLogOffsetStore.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs b/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs index 8b345d653..d20b2f519 100644 --- a/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs +++ b/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs @@ -25,6 +25,17 @@ public class EventLogOffsetStore: EventStoreConnection, IEventLogOffsetStore public EventLogOffsetStore(IDatabaseConnection connection):base(connection) { Collection = Database.GetCollection(EventLogMetadataCollectionName); + + CreateCollectionIfNotExists(); + } + + void CreateCollectionIfNotExists() + { + var collectionNames = Database.ListCollectionNames().ToList(); + if (!collectionNames.Contains(EventLogMetadataCollectionName)) + { + Database.CreateCollection(EventLogMetadataCollectionName); + } } public Task UpdateOffset(IClientSessionHandle session, ScopeId scopeId, ulong nextEventOffset, From 6a88d9ba469cadbbcf2f39530b520d05407b2050 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Tue, 17 Sep 2024 18:12:10 +0200 Subject: [PATCH 8/8] Added support for sparse event streams & sparse scoped streams --- .../CommittedEventsFetcher.cs | 12 ++--- .../EventHorizon/EventHorizonEventsWriter.cs | 2 + ...{EventLogMetadata.cs => StreamMetadata.cs} | 7 ++- .../Persistence/CommitWriter.cs | 12 ++--- ...EventLogOffsetStore.cs => IOffsetStore.cs} | 23 +++++----- ...{EventLogOffsetStore.cs => OffsetStore.cs} | 45 ++++++++++--------- .../Streams/CollectionExtensions.cs | 17 +++++++ .../Streams/EventsToStreamsWriter.cs | 43 +++++++++++++----- .../Events.Store.MongoDB/Streams/IStreams.cs | 2 - .../Streams/IWriteEventsToStreamCollection.cs | 2 +- .../Events.Store.MongoDB/Streams/Streams.cs | 4 +- .../EventStoreOffsetTests.cs | 33 +++++++------- 12 files changed, 122 insertions(+), 80 deletions(-) rename Source/Events.Store.MongoDB/Events/{EventLogMetadata.cs => StreamMetadata.cs} (73%) rename Source/Events.Store.MongoDB/Persistence/{IEventLogOffsetStore.cs => IOffsetStore.cs} (50%) rename Source/Events.Store.MongoDB/Persistence/{EventLogOffsetStore.cs => OffsetStore.cs} (51%) create mode 100644 Source/Events.Store.MongoDB/Streams/CollectionExtensions.cs diff --git a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs index 116a4625e..736e224cd 100644 --- a/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs +++ b/Source/Events.Store.MongoDB/CommittedEventsFetcher.cs @@ -38,7 +38,7 @@ public class CommittedEventsFetcher : IFetchCommittedEvents readonly IAggregateRoots _aggregateRoots; readonly ILogger _logger; readonly IEventContentConverter _contentConverter; - readonly IEventLogOffsetStore _eventLogOffsetStore; + readonly IOffsetStore _offsetStore; /// /// Initializes a new instance of the class. @@ -47,14 +47,14 @@ public class CommittedEventsFetcher : IFetchCommittedEvents /// The . /// The . /// The . - /// The . + /// The . /// public CommittedEventsFetcher( IStreams streams, IEventConverter eventConverter, IAggregateRoots aggregateRoots, IEventContentConverter contentConverter, - IEventLogOffsetStore eventLogOffsetStore, + IOffsetStore offsetStore, ILogger logger) { _streams = streams; @@ -62,15 +62,15 @@ public CommittedEventsFetcher( _aggregateRoots = aggregateRoots; _contentConverter = contentConverter; _logger = logger; - _eventLogOffsetStore = eventLogOffsetStore; + _offsetStore = offsetStore; } /// public async Task FetchNextSequenceNumber(ScopeId scope, CancellationToken cancellationToken) { - var storedEventOffset = await _eventLogOffsetStore.GetNextOffset(scope, cancellationToken); var eventLog = await GetEventLog(scope, cancellationToken).ConfigureAwait(false); + var storedEventOffset = await _offsetStore.GetNextOffset(eventLog.CollectionNamespace.CollectionName, null, cancellationToken); var eventCount = (ulong)await eventLog.CountDocumentsAsync( _eventFilter.Empty, cancellationToken: cancellationToken) @@ -95,7 +95,7 @@ public async Task FetchNextSequenceNumber(ScopeId scope, if (nextSequenceNumber != eventCount) { - _logger.LogInformation("Sparse event log: Last event sequence number was {LastEventSequenceNumber}, but event count was {EventCount}", lastEvent.EventLogSequenceNumber, + _logger.LogInformation("Sparse event log: Next event sequence number was {NextSequenceNumber}, but event count was {EventCount}", nextSequenceNumber, eventCount); } diff --git a/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs b/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs index ef8b70db6..b08e77963 100644 --- a/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs +++ b/Source/Events.Store.MongoDB/EventHorizon/EventHorizonEventsWriter.cs @@ -6,8 +6,10 @@ using System.Threading.Tasks; using Dolittle.Runtime.Events.Store.EventHorizon; using Dolittle.Runtime.Events.Store.MongoDB.Events; +using Dolittle.Runtime.Events.Store.MongoDB.Persistence; using Dolittle.Runtime.Events.Store.MongoDB.Streams; using Dolittle.Runtime.Events.Store.Streams; +using MongoDB.Driver; namespace Dolittle.Runtime.Events.Store.MongoDB.EventHorizon; diff --git a/Source/Events.Store.MongoDB/Events/EventLogMetadata.cs b/Source/Events.Store.MongoDB/Events/StreamMetadata.cs similarity index 73% rename from Source/Events.Store.MongoDB/Events/EventLogMetadata.cs rename to Source/Events.Store.MongoDB/Events/StreamMetadata.cs index 87480cba8..5e533ffd2 100644 --- a/Source/Events.Store.MongoDB/Events/EventLogMetadata.cs +++ b/Source/Events.Store.MongoDB/Events/StreamMetadata.cs @@ -1,18 +1,17 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System; using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; namespace Dolittle.Runtime.Events.Store.MongoDB.Events; /// -/// Keeps the offset metadata for an event log. +/// Keeps the offset metadata for a specific stream. /// -public class EventLogMetadata +public class StreamMetadata { - [BsonId] public required Guid Scope { get; init; } + [BsonId] public required string StreamName { get; init; } [BsonRepresentation(BsonType.Decimal128)] public ulong NextEventOffset { get; init; } diff --git a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs index 4bd0e26f3..3df30752f 100644 --- a/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs +++ b/Source/Events.Store.MongoDB/Persistence/CommitWriter.cs @@ -24,7 +24,7 @@ public class CommitWriter : IPersistCommits readonly IStreamEventWatcher _streamWatcher; readonly IConvertCommitToEvents _commitConverter; readonly IUpdateAggregateVersionsAfterCommit _aggregateVersions; - readonly IEventLogOffsetStore _eventLogOffsetStore; + readonly IOffsetStore _offsetStore; /// /// Initializes a new instance of the class. @@ -33,20 +33,20 @@ public class CommitWriter : IPersistCommits /// The . /// The . /// The . - /// The . + /// The . public CommitWriter( IStreams streams, IStreamEventWatcher streamWatcher, IConvertCommitToEvents commitConverter, IUpdateAggregateVersionsAfterCommit aggregateVersions, - IEventLogOffsetStore eventLogOffsetStore + IOffsetStore offsetStore ) { _streams = streams; _streamWatcher = streamWatcher; _commitConverter = commitConverter; _aggregateVersions = aggregateVersions; - _eventLogOffsetStore = eventLogOffsetStore; + _offsetStore = offsetStore; } /// @@ -68,8 +68,8 @@ await _streams.DefaultEventLog.InsertManyAsync( eventsToStore, cancellationToken: cancellationToken).ConfigureAwait(false); - var nextOffset = commit.LastSequenceNumber + 1; - await _eventLogOffsetStore.UpdateOffset(session, ScopeId.Default, nextOffset, cancellationToken) + var nextSequenceNumber = commit.LastSequenceNumber + 1; + await _offsetStore.UpdateOffset(Streams.Streams.EventLogCollectionName, session, nextSequenceNumber, cancellationToken) .ConfigureAwait(false); await _aggregateVersions.UpdateAggregateVersions(session, commit, cancellationToken).ConfigureAwait(false); diff --git a/Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs b/Source/Events.Store.MongoDB/Persistence/IOffsetStore.cs similarity index 50% rename from Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs rename to Source/Events.Store.MongoDB/Persistence/IOffsetStore.cs index 70ef4ee96..6aca07d06 100644 --- a/Source/Events.Store.MongoDB/Persistence/IEventLogOffsetStore.cs +++ b/Source/Events.Store.MongoDB/Persistence/IOffsetStore.cs @@ -3,7 +3,6 @@ using System.Threading; using System.Threading.Tasks; -using Dolittle.Runtime.Events.Store.Persistence; using MongoDB.Driver; namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; @@ -11,23 +10,25 @@ namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; /// /// Defines a system that can update the high watermark offset for an event log. /// -public interface IEventLogOffsetStore +public interface IOffsetStore { /// - /// Updates the next offset for a given event log. + /// Updates the next offset for a given event log / stream. /// + /// /// The for the MongoDB transaction. - /// The . - /// The next offset to write to this eventlog + /// The offset that was just written /// The . /// A that, when resolved, returns the result of the operation. - Task UpdateOffset(IClientSessionHandle session, ScopeId scopeId, ulong nextOffset, CancellationToken cancellationToken); - + Task UpdateOffset(string stream, IClientSessionHandle session, ulong writtenOffset, + CancellationToken cancellationToken); + /// - /// Gets the next offset for a given event log. + /// Gets the next offset for a given stream /// - /// - /// + /// The stream to get the next offset for + /// The for the MongoDB transaction.(Optional) + /// The . /// - public Task GetNextOffset(ScopeId scopeId, CancellationToken cancellationToken); + public Task GetNextOffset(string stream, IClientSessionHandle? session, CancellationToken cancellationToken); } diff --git a/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs b/Source/Events.Store.MongoDB/Persistence/OffsetStore.cs similarity index 51% rename from Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs rename to Source/Events.Store.MongoDB/Persistence/OffsetStore.cs index d20b2f519..26c5aaeaa 100644 --- a/Source/Events.Store.MongoDB/Persistence/EventLogOffsetStore.cs +++ b/Source/Events.Store.MongoDB/Persistence/OffsetStore.cs @@ -6,25 +6,23 @@ using Dolittle.Runtime.DependencyInversion.Lifecycle; using Dolittle.Runtime.DependencyInversion.Scoping; using Dolittle.Runtime.Events.Store.MongoDB.Events; -using Dolittle.Runtime.Events.Store.Persistence; using MongoDB.Driver; namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence; [Singleton, PerTenant] -public class EventLogOffsetStore: EventStoreConnection, IEventLogOffsetStore +public class OffsetStore : EventStoreConnection, IOffsetStore { - static readonly FilterDefinitionBuilder _filterBuilder = new(); - static readonly FilterDefinition _defaultScopeFilter = _filterBuilder.Eq(metadata => metadata.Scope, ScopeId.Default.Value); - - + static readonly FilterDefinitionBuilder _filterBuilder = new(); + + const string EventLogMetadataCollectionName = "event-log-metadata"; - public IMongoCollection Collection { get; } + IMongoCollection Collection { get; } - public EventLogOffsetStore(IDatabaseConnection connection):base(connection) + public OffsetStore(IDatabaseConnection connection) : base(connection) { - Collection = Database.GetCollection(EventLogMetadataCollectionName); + Collection = Database.GetCollection(EventLogMetadataCollectionName); CreateCollectionIfNotExists(); } @@ -38,22 +36,22 @@ void CreateCollectionIfNotExists() } } - public Task UpdateOffset(IClientSessionHandle session, ScopeId scopeId, ulong nextEventOffset, + public Task UpdateOffset(string stream, IClientSessionHandle session, ulong nextEventOffset, CancellationToken cancellationToken) { - var metadata = new EventLogMetadata + var metadata = new StreamMetadata { - Scope = scopeId.Value, + StreamName = stream, NextEventOffset = nextEventOffset, }; - - var updateDefinition = new UpdateDefinitionBuilder() - .SetOnInsert(it => it.Scope, metadata.Scope) + + var updateDefinition = new UpdateDefinitionBuilder() + .SetOnInsert(it => it.StreamName, stream) .Set(it => it.NextEventOffset, metadata.NextEventOffset); return Collection.UpdateOneAsync( - session:session, - filter: GetFilter(scopeId), + session: session, + filter: GetFilter(stream), updateDefinition, options: new UpdateOptions { @@ -61,11 +59,16 @@ public Task UpdateOffset(IClientSessionHandle session, ScopeId scopeId, ulong ne }, cancellationToken); } - public Task GetNextOffset(ScopeId scopeId, CancellationToken cancellationToken) - => Collection - .Find(GetFilter(scopeId)) + public Task GetNextOffset(string stream, IClientSessionHandle? session, CancellationToken cancellationToken) + { + var find = session is not null + ? Collection.Find(session, GetFilter(stream)) + : Collection.Find(GetFilter(stream)); + return find .Project(metadata => metadata.NextEventOffset) .FirstOrDefaultAsync(cancellationToken); + } - static FilterDefinition GetFilter(ScopeId scopeId) => scopeId.IsDefaultScope ? _defaultScopeFilter : _filterBuilder.Eq(metadata => metadata.Scope, scopeId.Value); + static FilterDefinition GetFilter(string streamName) => + _filterBuilder.Eq(metadata => metadata.StreamName, streamName); } diff --git a/Source/Events.Store.MongoDB/Streams/CollectionExtensions.cs b/Source/Events.Store.MongoDB/Streams/CollectionExtensions.cs new file mode 100644 index 000000000..f305ac18f --- /dev/null +++ b/Source/Events.Store.MongoDB/Streams/CollectionExtensions.cs @@ -0,0 +1,17 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; +using System.Threading.Tasks; +using MongoDB.Driver; + +namespace Dolittle.Runtime.Events.Store.MongoDB.Streams; + +public static class CollectionExtensions +{ + public static async Task CountDocuments(this IMongoCollection collection, IClientSessionHandle session, CancellationToken cancellationToken) => + (ulong) await collection.CountDocumentsAsync( + session, + Builders.Filter.Empty, + cancellationToken: cancellationToken).ConfigureAwait(false); +} diff --git a/Source/Events.Store.MongoDB/Streams/EventsToStreamsWriter.cs b/Source/Events.Store.MongoDB/Streams/EventsToStreamsWriter.cs index 5880ecf32..32ad64978 100644 --- a/Source/Events.Store.MongoDB/Streams/EventsToStreamsWriter.cs +++ b/Source/Events.Store.MongoDB/Streams/EventsToStreamsWriter.cs @@ -10,6 +10,7 @@ using Dolittle.Runtime.DependencyInversion; using Dolittle.Runtime.Events.Processing; using Dolittle.Runtime.Events.Store.MongoDB.Events; +using Dolittle.Runtime.Events.Store.MongoDB.Persistence; using Dolittle.Runtime.Events.Store.Streams; using Microsoft.Extensions.Logging; using MongoDB.Driver; @@ -17,6 +18,7 @@ namespace Dolittle.Runtime.Events.Store.MongoDB.Streams; + /// /// Represents an implementation of and .. /// @@ -26,18 +28,21 @@ public class EventsToStreamsWriter : IWriteEventsToStreamCollection, IWriteEvent readonly IStreams _streams; readonly IEventConverter _eventConverter; readonly ILogger _logger; + readonly IOffsetStore _offsetStore; /// /// Initializes a new instance of the class. /// /// The . /// The . + /// The for storing next written offsets. /// An . - public EventsToStreamsWriter(IStreams streams, IEventConverter eventConverter, ILogger logger) + public EventsToStreamsWriter(IStreams streams, IEventConverter eventConverter, IOffsetStore offsetStore, ILogger logger) { _streams = streams; _eventConverter = eventConverter; _logger = logger; + _offsetStore = offsetStore; } /// @@ -47,6 +52,7 @@ public Task Write(CommittedEvent @event, ScopeId scope, StreamId stream, Partiti /// public async Task Write(IEnumerable<(CommittedEvent, PartitionId)> events, ScopeId scope, StreamId stream, CancellationToken cancellationToken) { + var lastWrittenStreamPosition = await Write( await _streams.Get(scope, stream, cancellationToken).ConfigureAwait(false), streamPosition => @@ -80,6 +86,8 @@ async Task WriteOnlyNewEvents(IMongoCollection s }; IReadOnlyList eventsToWrite = ImmutableList.Empty; + var streamName = stream.CollectionNamespace.CollectionName; + try { @@ -87,15 +95,29 @@ async Task WriteOnlyNewEvents(IMongoCollection s try { session.StartTransaction(); - // TODO: Have a look at this. - var streamPosition = (ulong) await stream.CountDocumentsAsync( - session, - Builders.Filter.Empty, - cancellationToken: cancellationToken).ConfigureAwait(false); + + var storedNextOffset = await _offsetStore.GetNextOffset(streamName, session, cancellationToken).ConfigureAwait(false); + var streamPosition = storedNextOffset; + if (storedNextOffset < 1) + { + // No offset stored, either empty collection or legacy data + streamPosition = (ulong) await stream.CountDocumentsAsync( + session, + Builders.Filter.Empty, + cancellationToken: cancellationToken).ConfigureAwait(false); + + } + + eventsToWrite = createEventsToWrite(streamPosition); await WriteToCollection(eventsToWrite, session, cancellationToken).ConfigureAwait(false); + await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false); - return streamPosition + (ulong) (eventsToWrite.Count - 1); + var writtenOffset = streamPosition + (ulong) (eventsToWrite.Count - 1); + var nextOffset = writtenOffset + 1; + await _offsetStore.UpdateOffset(streamName, session, nextOffset, cancellationToken).ConfigureAwait(false); + + return writtenOffset; } catch { @@ -125,7 +147,10 @@ async Task WriteOnlyNewEvents(IMongoCollection s await WriteToCollection(eventsToWrite, session, cancellationToken).ConfigureAwait(false); await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false); - return streamPosition + (ulong) (eventsToWrite.Count - 1); + var writtenOffset = streamPosition + (ulong) (eventsToWrite.Count - 1); + var nextOffset = writtenOffset + 1; + await _offsetStore.UpdateOffset(streamName, session, nextOffset, cancellationToken).ConfigureAwait(false); + return writtenOffset; } catch { @@ -135,8 +160,6 @@ async Task WriteOnlyNewEvents(IMongoCollection s } } - - static Task> GetStoredEventStreamHeadOfStreamToWrite(IMongoCollection stream, IReadOnlyList eventsToWrite, IClientSessionHandle transaction, CancellationToken cancellationToken) => stream .Find( diff --git a/Source/Events.Store.MongoDB/Streams/IStreams.cs b/Source/Events.Store.MongoDB/Streams/IStreams.cs index 0e06b68b8..3f1a8d2bc 100644 --- a/Source/Events.Store.MongoDB/Streams/IStreams.cs +++ b/Source/Events.Store.MongoDB/Streams/IStreams.cs @@ -18,8 +18,6 @@ public interface IStreams : IEventStoreConnection /// IMongoCollection DefaultEventLog { get; } - IMongoCollection EventLogMetadata { get; } - /// /// Gets a non-public and non-event-log Stream. /// diff --git a/Source/Events.Store.MongoDB/Streams/IWriteEventsToStreamCollection.cs b/Source/Events.Store.MongoDB/Streams/IWriteEventsToStreamCollection.cs index 574f0df9d..d0101a118 100644 --- a/Source/Events.Store.MongoDB/Streams/IWriteEventsToStreamCollection.cs +++ b/Source/Events.Store.MongoDB/Streams/IWriteEventsToStreamCollection.cs @@ -28,7 +28,7 @@ Task Write( IMongoCollection stream, Func createStoreEvent, CancellationToken cancellationToken) where TEvent : IEvent; - + /// /// Writes multiple Event to Stream collection. /// diff --git a/Source/Events.Store.MongoDB/Streams/Streams.cs b/Source/Events.Store.MongoDB/Streams/Streams.cs index 4bb49a1c3..c052d26ac 100644 --- a/Source/Events.Store.MongoDB/Streams/Streams.cs +++ b/Source/Events.Store.MongoDB/Streams/Streams.cs @@ -18,7 +18,7 @@ namespace Dolittle.Runtime.Events.Store.MongoDB.Streams; [Singleton, PerTenant] public class Streams : EventStoreConnection, IStreams { - const string EventLogCollectionName = "event-log"; + public const string EventLogCollectionName = "event-log"; const string StreamDefinitionCollectionName = "stream-definitions"; readonly ILogger _logger; @@ -43,7 +43,7 @@ public Streams(IDatabaseConnection connection, ILogger logger) /// public IMongoCollection DefaultEventLog { get; } - public IMongoCollection EventLogMetadata { get; } + public IMongoCollection EventLogMetadata { get; } /// public Task> Get(ScopeId scopeId, StreamId streamId, CancellationToken token) diff --git a/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs b/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs index c448d3f17..ca15dd7e0 100644 --- a/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs +++ b/Specifications/Events.Store.MongoDB.Tests/EventStoreOffsetTests.cs @@ -1,7 +1,6 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using Dolittle.Runtime.Events.Store; using Dolittle.Runtime.Events.Store.MongoDB.Persistence; using FluentAssertions; using Xunit; @@ -13,7 +12,7 @@ public class EventStoreOffsetTests(MongoDatabaseFixture fixture): IClassFixture< [Fact] public async Task WhenNoOffsetExists() { - var nextOffset = await Sut.GetNextOffset(new ScopeId(Guid.NewGuid()), CancellationToken.None); + var nextOffset = await Sut.GetNextOffset(Guid.NewGuid().ToString(), null, CancellationToken.None); nextOffset.Should().Be(0); } @@ -22,9 +21,9 @@ public async Task WhenNoOffsetExists() public async Task WhenStoringDefaultScopeOffset() { ulong nextOffset = 9; - await StoreOffset(ScopeId.Default, nextOffset); + await StoreOffset("event-log", nextOffset); - var nextOffsetRetrieved = await Sut.GetNextOffset(ScopeId.Default, CancellationToken.None); + var nextOffsetRetrieved = await Sut.GetNextOffset("event-log", null, CancellationToken.None); nextOffsetRetrieved.Should().Be(nextOffset); } @@ -32,12 +31,12 @@ public async Task WhenStoringDefaultScopeOffset() [Fact] public async Task WhenUpdatingOffset() { - var scope = new ScopeId(Guid.NewGuid()); + var stream = Guid.NewGuid().ToString(); ulong nextOffset = 99; - await StoreOffset(scope, nextOffset); - await StoreOffset(scope, ++nextOffset); + await StoreOffset(stream, nextOffset); + await StoreOffset(stream, ++nextOffset); - var nextOffsetRetrieved = await Sut.GetNextOffset(scope, CancellationToken.None); + var nextOffsetRetrieved = await Sut.GetNextOffset(stream, null, CancellationToken.None); nextOffsetRetrieved.Should().Be(nextOffset); } @@ -45,31 +44,31 @@ public async Task WhenUpdatingOffset() [Fact] public async Task WhenRollingBackOffset() { - var scope = new ScopeId(Guid.NewGuid()); + var stream = Guid.NewGuid().ToString("N"); ulong nextOffset = 999; - await StoreOffset(scope, nextOffset); - await StoreOffsetButRollBack(scope, nextOffset + 1); + await StoreOffset(stream, nextOffset); + await StoreOffsetButRollBack(stream, nextOffset + 1); - var nextOffsetRetrieved = await Sut.GetNextOffset(scope, CancellationToken.None); + var nextOffsetRetrieved = await Sut.GetNextOffset(stream, null, CancellationToken.None); nextOffsetRetrieved.Should().Be(nextOffset); } - private async Task StoreOffset(ScopeId scope, ulong nextOffset) + private async Task StoreOffset(string stream, ulong nextOffset) { using var session = await fixture.Connection.MongoClient.StartSessionAsync(); session.StartTransaction(); - await Sut.UpdateOffset(session, scope, nextOffset, CancellationToken.None); + await Sut.UpdateOffset(stream, session, nextOffset, CancellationToken.None); await session.CommitTransactionAsync(); } - private async Task StoreOffsetButRollBack(ScopeId scope, ulong nextOffset) + private async Task StoreOffsetButRollBack(string stream, ulong nextOffset) { using var session = await fixture.Connection.MongoClient.StartSessionAsync(); session.StartTransaction(); - await Sut.UpdateOffset(session, scope, nextOffset, CancellationToken.None); + await Sut.UpdateOffset(stream, session, nextOffset, CancellationToken.None); await session.AbortTransactionAsync(); } - private EventLogOffsetStore Sut { get; } = new(fixture.Connection); + private OffsetStore Sut { get; } = new(fixture.Connection); } \ No newline at end of file