From 9dbd81412dd3d8fc81612c84855fdee7b5b6da90 Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Thu, 1 Aug 2024 15:07:36 +0200 Subject: [PATCH 1/6] Use IAsyncEnumerable --- src/Directory.Packages.props | 1 + .../AggregateStore.cs | 7 +-- src/StreamWave/Aggregate.cs | 13 +++-- src/StreamWave/AggregateBuilder.cs | 2 +- src/StreamWave/AggregateBuilderDefaults.cs | 6 +- src/StreamWave/EventStream.cs | 55 +++++++++++-------- .../Extensions/EventStreamExtensions.cs | 31 ----------- src/StreamWave/IAggregateBuilder.cs | 2 +- src/StreamWave/IEventStream.cs | 2 +- src/StreamWave/StreamWave.csproj | 1 + .../UnitTest1.cs | 4 +- test/StreamWave.Tests/AggregateTest.cs | 16 +++--- test/StreamWave.Tests/RegistrationTest.cs | 2 +- 13 files changed, 60 insertions(+), 82 deletions(-) delete mode 100644 src/StreamWave/Extensions/EventStreamExtensions.cs diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 2a40045..344b759 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -15,6 +15,7 @@ + diff --git a/src/StreamWave.EntityFramework/AggregateStore.cs b/src/StreamWave.EntityFramework/AggregateStore.cs index c1266f3..2c56a3f 100644 --- a/src/StreamWave.EntityFramework/AggregateStore.cs +++ b/src/StreamWave.EntityFramework/AggregateStore.cs @@ -94,13 +94,12 @@ private void SaveEvents(IEventStream stream) } } - public async Task?> LoadStreamAsync(TId id) + public IEventStream LoadStreamAsync(TId id) { - var events = await context.Set>() + var events = context.Set>() .Where(x => x.StreamId.Equals(id)) .OrderBy(x => x.Version) - .Select(x => GetEvent(x)) - .ToArrayAsync(); + .Select(x => GetEvent(x)); return EventStream.Create(id, events); } diff --git a/src/StreamWave/Aggregate.cs b/src/StreamWave/Aggregate.cs index 19d6e56..240ab25 100644 --- a/src/StreamWave/Aggregate.cs +++ b/src/StreamWave/Aggregate.cs @@ -18,7 +18,7 @@ public delegate TState CreateStateDelegate() /// The current state of the aggregate. /// The event to be applied. /// A task representing the asynchronous operation, with the updated state as the result. -public delegate Task ApplyEventDelegate(TState state, object e); +public delegate TState ApplyEventDelegate(TState state, object e); /// /// Delegate for loading the event stream of an aggregate based on its identifier. @@ -27,7 +27,7 @@ public delegate TState CreateStateDelegate() /// The identifier of the aggregate. /// A task representing the asynchronous operation, with the loaded event stream as the result. /// The result can be null if the event stream does not exist. -public delegate Task?> LoadEventStreamDelegate(TId id); +public delegate IEventStream LoadEventStreamDelegate(TId id); /// /// Delegate for saving the aggregate and returning the updated event stream. @@ -106,10 +106,11 @@ internal Aggregate( /// /// /// - public async Task ApplyAsync(object e) + public Task ApplyAsync(object e) { - State = await _applier(State, e); + State = _applier(State, e); _stream.Append(e); + return Task.CompletedTask; } /// @@ -119,7 +120,7 @@ public async Task ApplyAsync(object e) /// public async Task LoadAsync(TId id) { - _stream = await _loader(id) ?? EventStream.Create(id); + _stream = _loader(id) ?? EventStream.Create(id); await UpdateState(); } @@ -139,7 +140,7 @@ public async Task SaveAsync() /// private async Task UpdateState() { - State = await _stream.Select(x => x.Event).AggregateAsync(_creator(), async (state, e) => await _applier(state, e)); + State = await _stream.Select(x => x.Event).AggregateAsync(_creator(), (state, e) => _applier(state, e)); State.Id = _stream.Id; } } diff --git a/src/StreamWave/AggregateBuilder.cs b/src/StreamWave/AggregateBuilder.cs index 8e358c5..6f3a92b 100644 --- a/src/StreamWave/AggregateBuilder.cs +++ b/src/StreamWave/AggregateBuilder.cs @@ -64,7 +64,7 @@ public IAggregateBuilder WithApplier(Func WithApplier(Func> applier) where TEvent : notnull + public IAggregateBuilder WithApplier(Func applier) where TEvent : notnull { _events.Add(typeof(TEvent), (state, e) => applier(state, (TEvent)e)); return this; diff --git a/src/StreamWave/AggregateBuilderDefaults.cs b/src/StreamWave/AggregateBuilderDefaults.cs index 6589a20..cb5f43a 100644 --- a/src/StreamWave/AggregateBuilderDefaults.cs +++ b/src/StreamWave/AggregateBuilderDefaults.cs @@ -3,8 +3,8 @@ internal static class AggregateBuilderDefaults { public static ApplyEventDelegate DefaultApplier(Dictionary> events) - => async (state, e) => events.TryGetValue(e.GetType(), out var applier) - ? await applier(state, e) + => (state, e) => events.TryGetValue(e.GetType(), out var applier) + ? applier(state, e) : state; public static ValidateStateDelegate DefaultValidator(List> rules) => @@ -13,7 +13,7 @@ public static ValidateStateDelegate DefaultValidator(List DefaultLoader(EventRecord[]? events = null) - => (streamId) => Task.FromResult(events is not null ? EventStream.Create(streamId, events) : null); + => (streamId) => EventStream.Create(streamId, events); public static SaveAggregateDelegate DefaultSaver() => (aggregate) => Task.FromResult(EventStream.Create(aggregate.Stream.Id, aggregate.Stream.GetUncommittedEvents())); diff --git a/src/StreamWave/EventStream.cs b/src/StreamWave/EventStream.cs index c7763af..d14e116 100644 --- a/src/StreamWave/EventStream.cs +++ b/src/StreamWave/EventStream.cs @@ -13,7 +13,7 @@ public static class EventStream /// The identifier for the event stream. /// An optional array of initial events. /// An instance of . - public static IEventStream Create(TId streamId, EventRecord[]? events = null) + public static IEventStream Create(TId streamId, IEnumerable? events = null) => new EventStream(streamId, events); } @@ -23,10 +23,12 @@ public static IEventStream Create(TId streamId, EventRecord[]? events /// The type of the identifier for the event stream. /// The identifier for the event stream. /// An optional array of initial events. -public class EventStream(TId streamId, EventRecord[]? events = null) : IEventStream +public class EventStream(TId streamId, IEnumerable? events) : IEventStream { - private readonly EventRecord[] _events = events ?? []; + private readonly IEnumerable _events = events ?? []; private readonly List _uncommitted = []; + private DateTimeOffset? _createdOn = null; + private DateTimeOffset? _LastModifiedOn = null; /// /// Gets the identifier for the event stream. @@ -41,31 +43,27 @@ public class EventStream(TId streamId, EventRecord[]? events = null) : IEve /// /// Gets the version of the event stream, based on the number of committed events. /// - public int Version => _events.Length; + public int Version { get; private set; } /// /// Gets the expected version of the event stream, including uncommitted events. /// - public int ExpectedVersion => _events.Length + _uncommitted.Count; + public int ExpectedVersion => Version + _uncommitted.Count; /// /// Gets the timestamp of when the first event was created, if available. /// - public DateTimeOffset? CreatedOn => _events.Length != 0 - ? _events.FirstOrDefault()?.OccurredOn - : _uncommitted.FirstOrDefault()?.OccurredOn; + public DateTimeOffset? CreatedOn => _createdOn ?? TimeProvider.System.GetUtcNow(); /// /// Gets the timestamp of when the last event was modified, if available. /// - public DateTimeOffset? LastModifiedOn => _uncommitted.Count != 0 - ? _uncommitted.LastOrDefault()?.OccurredOn - : _events.LastOrDefault()?.OccurredOn; + public DateTimeOffset? LastModifiedOn => _LastModifiedOn ?? TimeProvider.System.GetUtcNow(); /// /// Gets the total count of committed events in the event stream. /// - public int Count => _events.Length; + public int Count { get; private set; } /// /// Appends a new event to the uncommitted events list. @@ -74,18 +72,6 @@ public class EventStream(TId streamId, EventRecord[]? events = null) : IEve public void Append(object e) => _uncommitted.Add(EventRecord.Create(e)); - /// - /// Returns an enumerator that iterates through the committed events. - /// - /// An enumerator for the committed events. - public IEnumerator GetEnumerator() => _events.ToList().GetEnumerator(); - - /// - /// Returns an enumerator that iterates through the committed events (non-generic). - /// - /// An enumerator for the committed events. - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - /// /// Gets the list of uncommitted events. /// @@ -97,6 +83,27 @@ public void Append(object e) /// /// A new instance of with the committed events. public IEventStream Commit() => EventStream.Create(Id, [.. _events, .. _uncommitted]); + + /// + /// + /// + /// + /// + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + foreach (var e in _events) + { + if(_createdOn == null) + { + _createdOn = e.OccurredOn; + } + _LastModifiedOn = e.OccurredOn; + Count++; + Version++; + yield return e; + await Task.Yield(); // Ensure it's asynchronous + } + } } diff --git a/src/StreamWave/Extensions/EventStreamExtensions.cs b/src/StreamWave/Extensions/EventStreamExtensions.cs deleted file mode 100644 index f2f4f74..0000000 --- a/src/StreamWave/Extensions/EventStreamExtensions.cs +++ /dev/null @@ -1,31 +0,0 @@ -namespace StreamWave.Extensions; - -/// -/// Provides extension methods for working with event streams. -/// -public static class EventStreamExtensions -{ - /// - /// Aggregates a sequence of source elements asynchronously, applying a specified function to each element and accumulating the result. - /// - /// The type of the state being aggregated. - /// The type of the elements in the source sequence. - /// The sequence of source elements to aggregate. - /// The initial state to start aggregation from. - /// The asynchronous function to apply to each element in the sequence. - /// A task that represents the asynchronous operation. The task result contains the aggregated state. - public static async Task AggregateAsync( - this IEnumerable source, - TState aggregate, - Func> func) - { - using IEnumerator e = source.GetEnumerator(); - if (!e.MoveNext()) - { - return aggregate; - } - - while (e.MoveNext()) aggregate = await func(aggregate, e.Current); - return aggregate; - } -} diff --git a/src/StreamWave/IAggregateBuilder.cs b/src/StreamWave/IAggregateBuilder.cs index 3c3553f..588f6c1 100644 --- a/src/StreamWave/IAggregateBuilder.cs +++ b/src/StreamWave/IAggregateBuilder.cs @@ -41,7 +41,7 @@ public interface IAggregateBuilder /// The type of the event to be applied. /// A function that applies the specific event type to the state. /// The current instance of . - IAggregateBuilder WithApplier(Func> applier) + IAggregateBuilder WithApplier(Func applier) where TEvent : notnull; /// diff --git a/src/StreamWave/IEventStream.cs b/src/StreamWave/IEventStream.cs index 6fc8a21..4f4d26f 100644 --- a/src/StreamWave/IEventStream.cs +++ b/src/StreamWave/IEventStream.cs @@ -4,7 +4,7 @@ /// Represents an event stream, which is a sequence of events that captures changes to an aggregate's state. /// /// The type of the identifier for the event stream. -public interface IEventStream : IReadOnlyCollection +public interface IEventStream : IAsyncEnumerable { /// /// Gets the identifier of the event stream. diff --git a/src/StreamWave/StreamWave.csproj b/src/StreamWave/StreamWave.csproj index 74643d5..58eb51e 100644 --- a/src/StreamWave/StreamWave.csproj +++ b/src/StreamWave/StreamWave.csproj @@ -1,5 +1,6 @@  + diff --git a/test/StreamWave.EntityFramework.Tests/UnitTest1.cs b/test/StreamWave.EntityFramework.Tests/UnitTest1.cs index eb2479f..fbfcda5 100644 --- a/test/StreamWave.EntityFramework.Tests/UnitTest1.cs +++ b/test/StreamWave.EntityFramework.Tests/UnitTest1.cs @@ -33,7 +33,7 @@ public async Task register_storage_full_specs() .WithApplier((s, e) => { s.Test = e.Field; - return Task.FromResult(s); + return s; }); var provider = services.BuildServiceProvider(); @@ -65,7 +65,7 @@ public async Task register_storage_with_extension() .WithApplier((s, e) => { s.Test = e.Field; - return Task.FromResult(s); + return s; }); var provider = services.BuildServiceProvider(); diff --git a/test/StreamWave.Tests/AggregateTest.cs b/test/StreamWave.Tests/AggregateTest.cs index d0b3858..355d44a 100644 --- a/test/StreamWave.Tests/AggregateTest.cs +++ b/test/StreamWave.Tests/AggregateTest.cs @@ -11,10 +11,10 @@ public void Should_have_default_values() { var aggregate = new Aggregate( creator: () => new(), - applier: (s, _) => Task.FromResult(s), + applier: (s, _) => s, validator: _ => [], loader: id => { - return Task.FromResult?>(EventStream.Create(Guid.Empty)); + return EventStream.Create(Guid.Empty); }, saver: s => Task.FromResult(s.Stream.Commit())); @@ -36,12 +36,12 @@ public async Task should_call_loader_with_same_id() var loader = Substitute.For>(); loader(Arg.Is(guid)) - .Returns(Task.FromResult?>(EventStream.Create(guid))); + .Returns(EventStream.Create(guid)); var aggregate = new Aggregate( creator: () => new(), - applier: (s, _) => Task.FromResult(s), + applier: (s, _) => s, validator: _ => [], loader: (id) => loader(id), saver: s => Task.FromResult(s.Stream.Commit())); @@ -95,7 +95,7 @@ public async Task should_call_saver() { var aggregate = new Aggregate( creator: () => new(), - applier: (s, _) => Task.FromResult(s), + applier: (s, _) => s, validator: AggregateBuilderDefaults.DefaultValidator([]), loader: AggregateBuilderDefaults.DefaultLoader(), saver: s => Task.FromResult(s.Stream.Commit())); @@ -159,7 +159,7 @@ public async Task should_call_validator() var aggregate = new Aggregate( creator: () => new(), - applier: (s, _) => Task.FromResult(s), + applier: (s, _) => s, validator: validator, loader: AggregateBuilderDefaults.DefaultLoader(), saver: s => Task.FromResult(s.Stream.Commit())); @@ -184,7 +184,7 @@ public async Task should_return_validation_messages() var aggregate = new Aggregate( creator: () => new(), - applier: (s, _) => Task.FromResult(s), + applier: (s, _) => s, validator: validator, loader: AggregateBuilderDefaults.DefaultLoader(), saver: s => Task.FromResult(s.Stream.Commit())); @@ -213,7 +213,7 @@ public async Task should_return_loaded_stream() var aggregate = new Aggregate( creator: () => new(), - applier: (s, _) => Task.FromResult(s), + applier: (s, _) => s, validator: AggregateBuilderDefaults.DefaultValidator([]), loader: loader, saver: s => Task.FromResult(s.Stream.Commit())); diff --git a/test/StreamWave.Tests/RegistrationTest.cs b/test/StreamWave.Tests/RegistrationTest.cs index e635854..e5934b1 100644 --- a/test/StreamWave.Tests/RegistrationTest.cs +++ b/test/StreamWave.Tests/RegistrationTest.cs @@ -13,7 +13,7 @@ public async Task AddAggregate_Test() services.AddAggregate(() => new TestState { Id = Guid.NewGuid() }) .WithApplier((state, e) => { state.Id = e.Id; - return Task.FromResult(state); + return state; }); var provider = services.BuildServiceProvider(); From 7b30dbd053bff9f77c83d47d3f491b382bf0bc97 Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Thu, 1 Aug 2024 15:18:48 +0200 Subject: [PATCH 2/6] store the eventtype of the event not the eventrecord --- src/StreamWave.EntityFramework/AggregateStore.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/StreamWave.EntityFramework/AggregateStore.cs b/src/StreamWave.EntityFramework/AggregateStore.cs index 2c56a3f..a681a95 100644 --- a/src/StreamWave.EntityFramework/AggregateStore.cs +++ b/src/StreamWave.EntityFramework/AggregateStore.cs @@ -87,7 +87,7 @@ private void SaveEvents(IEventStream stream) Guid.NewGuid(), stream.Id, version, - e.GetType().AssemblyQualifiedName ?? string.Empty, + e.EventType.AssemblyQualifiedName ?? string.Empty, _serializer.Serialize(e.Event, e.EventType), e.OccurredOn )); From 3c65a1fc85e1390cc2249c152fc256a8726de680 Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Thu, 1 Aug 2024 15:28:37 +0200 Subject: [PATCH 3/6] Remove LoadAsync Completely --- src/StreamWave/Aggregate.cs | 4 ++-- src/StreamWave/AggregateBuilder.cs | 2 +- src/StreamWave/AggregateManager.cs | 13 ++++++++----- .../Extensions/ServiceCollectionExtensions.cs | 2 +- src/StreamWave/IAggregate.cs | 13 ------------- src/StreamWave/IAggregateBuilder.cs | 7 ------- src/StreamWave/IAggregateManager.cs | 2 +- test/StreamWave.EntityFramework.Tests/UnitTest1.cs | 6 +++--- 8 files changed, 16 insertions(+), 33 deletions(-) diff --git a/src/StreamWave/Aggregate.cs b/src/StreamWave/Aggregate.cs index 240ab25..3c439f3 100644 --- a/src/StreamWave/Aggregate.cs +++ b/src/StreamWave/Aggregate.cs @@ -118,7 +118,7 @@ public Task ApplyAsync(object e) /// /// /// - public async Task LoadAsync(TId id) + internal async Task LoadAsync(TId id) { _stream = _loader(id) ?? EventStream.Create(id); await UpdateState(); @@ -128,7 +128,7 @@ public async Task LoadAsync(TId id) /// Saves the current state of the aggregate, updating the event stream. /// /// - public async Task SaveAsync() + internal async Task SaveAsync() { _stream = await _saver(this); await UpdateState(); diff --git a/src/StreamWave/AggregateBuilder.cs b/src/StreamWave/AggregateBuilder.cs index 6f3a92b..b987318 100644 --- a/src/StreamWave/AggregateBuilder.cs +++ b/src/StreamWave/AggregateBuilder.cs @@ -24,7 +24,7 @@ public IAggregateBuilder WithEvents(EventRecord[] events) return this; } - public IAggregate Build(IServiceProvider serviceProvider) + public Aggregate Build(IServiceProvider serviceProvider) { var applier = _applier?.Invoke(serviceProvider) ?? AggregateBuilderDefaults.DefaultApplier(_events); var validator = _validator?.Invoke(serviceProvider) ?? AggregateBuilderDefaults.DefaultValidator(_rules); diff --git a/src/StreamWave/AggregateManager.cs b/src/StreamWave/AggregateManager.cs index bd288ce..f559019 100644 --- a/src/StreamWave/AggregateManager.cs +++ b/src/StreamWave/AggregateManager.cs @@ -3,13 +3,13 @@ namespace StreamWave; internal class AggregateManager( - IAggregateBuilder builder, IServiceProvider serviceProvider) : IAggregateManager + AggregateBuilder builder, IServiceProvider serviceProvider) : IAggregateManager where TState : IAggregateState { - public Task> Create() + public IAggregate Create() { var aggregate = builder.Build(serviceProvider); - return Task.FromResult(aggregate); + return aggregate; } public async Task> LoadAsync(TId id) @@ -19,8 +19,11 @@ public async Task> LoadAsync(TId id) return aggregate; } - public Task SaveAsync(IAggregate aggregate) + public async Task SaveAsync(IAggregate aggregate) { - return aggregate.SaveAsync(); + if(aggregate is Aggregate a) + { + await a.SaveAsync(); + } } } diff --git a/src/StreamWave/Extensions/ServiceCollectionExtensions.cs b/src/StreamWave/Extensions/ServiceCollectionExtensions.cs index bae96ad..2134a8a 100644 --- a/src/StreamWave/Extensions/ServiceCollectionExtensions.cs +++ b/src/StreamWave/Extensions/ServiceCollectionExtensions.cs @@ -20,7 +20,7 @@ public static IAggregateBuilder AddAggregate(this ISer where TState : IAggregateState { var builder = new AggregateBuilder(initialState); - services.AddSingleton>(builder); + services.AddSingleton(builder); services.AddScoped, AggregateManager>(); return builder; } diff --git a/src/StreamWave/IAggregate.cs b/src/StreamWave/IAggregate.cs index 6f8a6fb..3738307 100644 --- a/src/StreamWave/IAggregate.cs +++ b/src/StreamWave/IAggregate.cs @@ -33,17 +33,4 @@ public interface IAggregate /// The event to apply. /// A task representing the asynchronous operation. Task ApplyAsync(object e); - - /// - /// Loads the aggregate's state from the event stream using the specified identifier. - /// - /// The identifier of the aggregate to load. - /// A task representing the asynchronous operation. - internal Task LoadAsync(TId id); - - /// - /// Saves the current state of the aggregate, committing any uncommitted events. - /// - /// A task representing the asynchronous operation. - internal Task SaveAsync(); } diff --git a/src/StreamWave/IAggregateBuilder.cs b/src/StreamWave/IAggregateBuilder.cs index 588f6c1..841038f 100644 --- a/src/StreamWave/IAggregateBuilder.cs +++ b/src/StreamWave/IAggregateBuilder.cs @@ -58,11 +58,4 @@ IAggregateBuilder WithApplier(Func /// The validation message to be returned if the rule fails. /// The current instance of . IAggregateBuilder WithValidator(Func rule, string message); - - /// - /// - /// - /// - /// - IAggregate Build(IServiceProvider serviceProvider); } diff --git a/src/StreamWave/IAggregateManager.cs b/src/StreamWave/IAggregateManager.cs index ffd990d..0ee094d 100644 --- a/src/StreamWave/IAggregateManager.cs +++ b/src/StreamWave/IAggregateManager.cs @@ -11,7 +11,7 @@ public interface IAggregateManager /// Creates a new aggregate with the initial state. /// /// A task representing the asynchronous operation, with the newly created aggregate as the result. - Task> Create(); + IAggregate Create(); /// /// Loads an existing aggregate based on its identifier. diff --git a/test/StreamWave.EntityFramework.Tests/UnitTest1.cs b/test/StreamWave.EntityFramework.Tests/UnitTest1.cs index fbfcda5..ea3a98a 100644 --- a/test/StreamWave.EntityFramework.Tests/UnitTest1.cs +++ b/test/StreamWave.EntityFramework.Tests/UnitTest1.cs @@ -40,7 +40,7 @@ public async Task register_storage_full_specs() var manager = provider.GetRequiredService>(); - var aggregate = await manager.Create(); + var aggregate = manager.Create(); await aggregate.ApplyAsync(new TestEvent("test")); @@ -71,8 +71,8 @@ public async Task register_storage_with_extension() var provider = services.BuildServiceProvider(); var manager = provider.GetRequiredService>(); - var aggregate = await manager.Create(); - var aggregate1 = await manager.Create(); + var aggregate = manager.Create(); + var aggregate1 = manager.Create(); await aggregate.ApplyAsync(new TestEvent("Aggregate 0")); await aggregate1.ApplyAsync(new TestEvent("Aggregate 1")); From 6483608a104c4d601148a34cce141bda833dfbbc Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Thu, 1 Aug 2024 15:46:14 +0200 Subject: [PATCH 4/6] Remove Comments because not part of the public api --- src/StreamWave/Aggregate.cs | 37 +------------------------------------ 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/src/StreamWave/Aggregate.cs b/src/StreamWave/Aggregate.cs index 3c439f3..0a899c3 100644 --- a/src/StreamWave/Aggregate.cs +++ b/src/StreamWave/Aggregate.cs @@ -46,12 +46,7 @@ public delegate TState CreateStateDelegate() /// An array of validation messages indicating the validation results. public delegate ValidationMessage[] ValidateStateDelegate(TState state); -/// -/// Aggregate class representing an aggregate in a domain-driven design context. -/// -/// -/// -public class Aggregate +internal class Aggregate : IAggregate where TState : IAggregateState { @@ -81,31 +76,14 @@ internal Aggregate( private IEventStream _stream; - /// - /// The current state of the aggregate. - /// public TState State { get; private set; } - /// - /// Validation messages for the current state. - /// public ValidationMessage[] Messages => _validator(State); - /// - /// Indicates whether the aggregate's state is valid. - /// public bool IsValid => Messages.Length == 0; - /// - /// The event stream for external access. - /// public IEventStream Stream => _stream; - /// - /// Applies an event to the aggregate, updating its state asynchronously. - /// - /// - /// public Task ApplyAsync(object e) { State = _applier(State, e); @@ -113,31 +91,18 @@ public Task ApplyAsync(object e) return Task.CompletedTask; } - /// - /// Loads the aggregate's state from the event stream based on its ID. - /// - /// - /// internal async Task LoadAsync(TId id) { _stream = _loader(id) ?? EventStream.Create(id); await UpdateState(); } - /// - /// Saves the current state of the aggregate, updating the event stream. - /// - /// internal async Task SaveAsync() { _stream = await _saver(this); await UpdateState(); } - /// - /// Updates the aggregate's state by applying events from the event stream. - /// - /// private async Task UpdateState() { State = await _stream.Select(x => x.Event).AggregateAsync(_creator(), (state, e) => _applier(state, e)); From e195cf4b5aa32b812f75c2f9f1a946506c5dff3f Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Thu, 1 Aug 2024 15:48:02 +0200 Subject: [PATCH 5/6] Remove SystemTime --- src/StreamWave/SystemTime.cs | 28 ---------------------------- 1 file changed, 28 deletions(-) delete mode 100644 src/StreamWave/SystemTime.cs diff --git a/src/StreamWave/SystemTime.cs b/src/StreamWave/SystemTime.cs deleted file mode 100644 index 95e7939..0000000 --- a/src/StreamWave/SystemTime.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System.Diagnostics.CodeAnalysis; - -namespace StreamWave; - -/// -/// Used for getting DateTime.Now(), time is changeable for unit testing -/// -[SuppressMessage("Major Code Smell", "S6354:Use a testable date/time provider", Justification = "this is the testable date/time provider!")] -public static class SystemTime -{ - /// Normally this is a pass-through to DateTime.Now, but it can be overridden with SetDateTime( .. ) for testing or debugging. - /// - public static Func Now { get; private set; } = () => DateTime.Now; - - /// Set time to return when SystemTime.Now() is called. - /// - public static void SetDateTime(DateTime dateTimeNow) - { - Now = () => dateTimeNow; - } - - /// Resets SystemTime.Now() to return DateTime.Now. - /// - public static void ResetDateTime() - { - Now = () => DateTime.Now; - } -} From 432ff8c921f76dae3cde080d025fde0bc7282c71 Mon Sep 17 00:00:00 2001 From: Patrick Evers Date: Thu, 1 Aug 2024 15:51:34 +0200 Subject: [PATCH 6/6] Remove Count --- src/StreamWave/EventStream.cs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/StreamWave/EventStream.cs b/src/StreamWave/EventStream.cs index d14e116..52a2495 100644 --- a/src/StreamWave/EventStream.cs +++ b/src/StreamWave/EventStream.cs @@ -60,11 +60,6 @@ public class EventStream(TId streamId, IEnumerable? events) : /// public DateTimeOffset? LastModifiedOn => _LastModifiedOn ?? TimeProvider.System.GetUtcNow(); - /// - /// Gets the total count of committed events in the event stream. - /// - public int Count { get; private set; } - /// /// Appends a new event to the uncommitted events list. /// @@ -98,7 +93,6 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken _createdOn = e.OccurredOn; } _LastModifiedOn = e.OccurredOn; - Count++; Version++; yield return e; await Task.Yield(); // Ensure it's asynchronous