Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageVersion Include="SonarAnalyzer.CSharp" Version="9.30.0.95878" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="SonarAnalyzer.CSharp" PrivateAssets="all" />
Expand Down
9 changes: 4 additions & 5 deletions src/StreamWave.EntityFramework/AggregateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,19 @@ private void SaveEvents(IEventStream<TId> stream)
Guid.NewGuid(),
stream.Id,
version,
e.GetType().AssemblyQualifiedName ?? string.Empty,
e.EventType.AssemblyQualifiedName ?? string.Empty,
_serializer.Serialize(e.Event, e.EventType),
e.OccurredOn
));
}
}

public async Task<IEventStream<TId>?> LoadStreamAsync(TId id)
public IEventStream<TId> LoadStreamAsync(TId id)
{
var events = await context.Set<PersistedEvent<TId>>()
var events = context.Set<PersistedEvent<TId>>()
.Where(x => x.StreamId.Equals(id))
.OrderBy(x => x.Version)
.Select(x => GetEvent(x))
.ToArrayAsync();
.Select(x => GetEvent(x));

return EventStream.Create(id, events);
}
Expand Down
54 changes: 10 additions & 44 deletions src/StreamWave/Aggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public delegate TState CreateStateDelegate<out TState, in TId>()
/// <param name="state">The current state of the aggregate.</param>
/// <param name="e">The event to be applied.</param>
/// <returns>A task representing the asynchronous operation, with the updated state as the result.</returns>
public delegate Task<TState> ApplyEventDelegate<TState>(TState state, object e);
public delegate TState ApplyEventDelegate<TState>(TState state, object e);

/// <summary>
/// Delegate for loading the event stream of an aggregate based on its identifier.
Expand All @@ -27,7 +27,7 @@ public delegate TState CreateStateDelegate<out TState, in TId>()
/// <param name="id">The identifier of the aggregate.</param>
/// <returns>A task representing the asynchronous operation, with the loaded event stream as the result.
/// The result can be null if the event stream does not exist.</returns>
public delegate Task<IEventStream<TId>?> LoadEventStreamDelegate<TId>(TId id);
public delegate IEventStream<TId> LoadEventStreamDelegate<TId>(TId id);

/// <summary>
/// Delegate for saving the aggregate and returning the updated event stream.
Expand All @@ -46,12 +46,7 @@ public delegate TState CreateStateDelegate<out TState, in TId>()
/// <returns>An array of validation messages indicating the validation results.</returns>
public delegate ValidationMessage[] ValidateStateDelegate<in TState>(TState state);

/// <summary>
/// Aggregate class representing an aggregate in a domain-driven design context.
/// </summary>
/// <typeparam name="TState"></typeparam>
/// <typeparam name="TId"></typeparam>
public class Aggregate<TState, TId>
internal class Aggregate<TState, TId>
: IAggregate<TState, TId>
where TState : IAggregateState<TId>
{
Expand Down Expand Up @@ -81,65 +76,36 @@ internal Aggregate(

private IEventStream<TId> _stream;

/// <summary>
/// The current state of the aggregate.
/// </summary>
public TState State { get; private set; }

/// <summary>
/// Validation messages for the current state.
/// </summary>
public ValidationMessage[] Messages => _validator(State);

/// <summary>
/// Indicates whether the aggregate's state is valid.
/// </summary>
public bool IsValid => Messages.Length == 0;

/// <summary>
/// The event stream for external access.
/// </summary>
public IEventStream<TId> Stream => _stream;

/// <summary>
/// Applies an event to the aggregate, updating its state asynchronously.
/// </summary>
/// <param name="e"></param>
/// <returns></returns>
public async Task ApplyAsync(object e)
public Task ApplyAsync(object e)
{
State = await _applier(State, e);
State = _applier(State, e);
_stream.Append(e);
return Task.CompletedTask;
}

/// <summary>
/// Loads the aggregate's state from the event stream based on its ID.
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public async Task LoadAsync(TId id)
internal async Task LoadAsync(TId id)
{
_stream = await _loader(id) ?? EventStream.Create(id);
_stream = _loader(id) ?? EventStream.Create(id);
await UpdateState();
}

/// <summary>
/// Saves the current state of the aggregate, updating the event stream.
/// </summary>
/// <returns></returns>
public async Task SaveAsync()
internal async Task SaveAsync()
{
_stream = await _saver(this);
await UpdateState();
}

/// <summary>
/// Updates the aggregate's state by applying events from the event stream.
/// </summary>
/// <returns></returns>
private async Task UpdateState()
{
State = await _stream.Select(x => x.Event).AggregateAsync(_creator(), async (state, e) => await _applier(state, e));
State = await _stream.Select(x => x.Event).AggregateAsync(_creator(), (state, e) => _applier(state, e));
State.Id = _stream.Id;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/StreamWave/AggregateBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public IAggregateBuilder<TState, TId> WithEvents(EventRecord[] events)
return this;
}

public IAggregate<TState, TId> Build(IServiceProvider serviceProvider)
public Aggregate<TState, TId> Build(IServiceProvider serviceProvider)
{
var applier = _applier?.Invoke(serviceProvider) ?? AggregateBuilderDefaults.DefaultApplier(_events);
var validator = _validator?.Invoke(serviceProvider) ?? AggregateBuilderDefaults.DefaultValidator(_rules);
Expand Down Expand Up @@ -64,7 +64,7 @@ public IAggregateBuilder<TState, TId> WithApplier(Func<IServiceProvider, ApplyEv
return this;
}

public IAggregateBuilder<TState, TId> WithApplier<TEvent>(Func<TState, TEvent, Task<TState>> applier) where TEvent : notnull
public IAggregateBuilder<TState, TId> WithApplier<TEvent>(Func<TState, TEvent, TState> applier) where TEvent : notnull
{
_events.Add(typeof(TEvent), (state, e) => applier(state, (TEvent)e));
return this;
Expand Down
6 changes: 3 additions & 3 deletions src/StreamWave/AggregateBuilderDefaults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
internal static class AggregateBuilderDefaults
{
public static ApplyEventDelegate<TState> DefaultApplier<TState>(Dictionary<Type, ApplyEventDelegate<TState>> events)
=> async (state, e) => events.TryGetValue(e.GetType(), out var applier)
? await applier(state, e)
=> (state, e) => events.TryGetValue(e.GetType(), out var applier)
? applier(state, e)
: state;

public static ValidateStateDelegate<TState> DefaultValidator<TState>(List<ValidationRule<TState>> rules) =>
Expand All @@ -13,7 +13,7 @@ public static ValidateStateDelegate<TState> DefaultValidator<TState>(List<Valida
.ToArray();

public static LoadEventStreamDelegate<TId> DefaultLoader<TId>(EventRecord[]? events = null)
=> (streamId) => Task.FromResult(events is not null ? EventStream.Create(streamId, events) : null);
=> (streamId) => EventStream.Create(streamId, events);

public static SaveAggregateDelegate<TState, TId> DefaultSaver<TState, TId>()
=> (aggregate) => Task.FromResult(EventStream.Create(aggregate.Stream.Id, aggregate.Stream.GetUncommittedEvents()));
Expand Down
13 changes: 8 additions & 5 deletions src/StreamWave/AggregateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
namespace StreamWave;

internal class AggregateManager<TState, TId>(
IAggregateBuilder<TState, TId> builder, IServiceProvider serviceProvider) : IAggregateManager<TState, TId>
AggregateBuilder<TState, TId> builder, IServiceProvider serviceProvider) : IAggregateManager<TState, TId>
where TState : IAggregateState<TId>
{
public Task<IAggregate<TState, TId>> Create()
public IAggregate<TState, TId> Create()
{
var aggregate = builder.Build(serviceProvider);
return Task.FromResult(aggregate);
return aggregate;
}

public async Task<IAggregate<TState, TId>> LoadAsync(TId id)
Expand All @@ -19,8 +19,11 @@ public async Task<IAggregate<TState, TId>> LoadAsync(TId id)
return aggregate;
}

public Task SaveAsync(IAggregate<TState, TId> aggregate)
public async Task SaveAsync(IAggregate<TState, TId> aggregate)
{
return aggregate.SaveAsync();
if(aggregate is Aggregate<TState, TId> a)
{
await a.SaveAsync();
}
}
}
57 changes: 29 additions & 28 deletions src/StreamWave/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static class EventStream
/// <param name="streamId">The identifier for the event stream.</param>
/// <param name="events">An optional array of initial events.</param>
/// <returns>An instance of <see cref="IEventStream{TId}"/>.</returns>
public static IEventStream<TId> Create<TId>(TId streamId, EventRecord[]? events = null)
public static IEventStream<TId> Create<TId>(TId streamId, IEnumerable<EventRecord>? events = null)
=> new EventStream<TId>(streamId, events);
}

Expand All @@ -23,10 +23,12 @@ public static IEventStream<TId> Create<TId>(TId streamId, EventRecord[]? events
/// <typeparam name="TId">The type of the identifier for the event stream.</typeparam>
/// <param name="streamId">The identifier for the event stream.</param>
/// <param name="events">An optional array of initial events.</param>
public class EventStream<TId>(TId streamId, EventRecord[]? events = null) : IEventStream<TId>
public class EventStream<TId>(TId streamId, IEnumerable<EventRecord>? events) : IEventStream<TId>
{
private readonly EventRecord[] _events = events ?? [];
private readonly IEnumerable<EventRecord> _events = events ?? [];
private readonly List<EventRecord> _uncommitted = [];
private DateTimeOffset? _createdOn = null;
private DateTimeOffset? _LastModifiedOn = null;

/// <summary>
/// Gets the identifier for the event stream.
Expand All @@ -41,31 +43,22 @@ public class EventStream<TId>(TId streamId, EventRecord[]? events = null) : IEve
/// <summary>
/// Gets the version of the event stream, based on the number of committed events.
/// </summary>
public int Version => _events.Length;
public int Version { get; private set; }

/// <summary>
/// Gets the expected version of the event stream, including uncommitted events.
/// </summary>
public int ExpectedVersion => _events.Length + _uncommitted.Count;
public int ExpectedVersion => Version + _uncommitted.Count;

/// <summary>
/// Gets the timestamp of when the first event was created, if available.
/// </summary>
public DateTimeOffset? CreatedOn => _events.Length != 0
? _events.FirstOrDefault()?.OccurredOn
: _uncommitted.FirstOrDefault()?.OccurredOn;
public DateTimeOffset? CreatedOn => _createdOn ?? TimeProvider.System.GetUtcNow();

/// <summary>
/// Gets the timestamp of when the last event was modified, if available.
/// </summary>
public DateTimeOffset? LastModifiedOn => _uncommitted.Count != 0
? _uncommitted.LastOrDefault()?.OccurredOn
: _events.LastOrDefault()?.OccurredOn;

/// <summary>
/// Gets the total count of committed events in the event stream.
/// </summary>
public int Count => _events.Length;
public DateTimeOffset? LastModifiedOn => _LastModifiedOn ?? TimeProvider.System.GetUtcNow();

/// <summary>
/// Appends a new event to the uncommitted events list.
Expand All @@ -74,18 +67,6 @@ public class EventStream<TId>(TId streamId, EventRecord[]? events = null) : IEve
public void Append(object e)
=> _uncommitted.Add(EventRecord.Create(e));

/// <summary>
/// Returns an enumerator that iterates through the committed events.
/// </summary>
/// <returns>An enumerator for the committed events.</returns>
public IEnumerator<EventRecord> GetEnumerator() => _events.ToList().GetEnumerator();

/// <summary>
/// Returns an enumerator that iterates through the committed events (non-generic).
/// </summary>
/// <returns>An enumerator for the committed events.</returns>
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

/// <summary>
/// Gets the list of uncommitted events.
/// </summary>
Expand All @@ -97,6 +78,26 @@ public void Append(object e)
/// </summary>
/// <returns>A new instance of <see cref="IEventStream{TId}"/> with the committed events.</returns>
public IEventStream<TId> Commit() => EventStream.Create(Id, [.. _events, .. _uncommitted]);

/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async IAsyncEnumerator<EventRecord> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
foreach (var e in _events)
{
if(_createdOn == null)
{
_createdOn = e.OccurredOn;
}
_LastModifiedOn = e.OccurredOn;
Version++;
yield return e;
await Task.Yield(); // Ensure it's asynchronous
}
}
}


Expand Down
31 changes: 0 additions & 31 deletions src/StreamWave/Extensions/EventStreamExtensions.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/StreamWave/Extensions/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static IAggregateBuilder<TState, TId> AddAggregate<TState, TId>(this ISer
where TState : IAggregateState<TId>
{
var builder = new AggregateBuilder<TState, TId>(initialState);
services.AddSingleton<IAggregateBuilder<TState, TId>>(builder);
services.AddSingleton(builder);
services.AddScoped<IAggregateManager<TState, TId>, AggregateManager<TState, TId>>();
return builder;
}
Expand Down
13 changes: 0 additions & 13 deletions src/StreamWave/IAggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,4 @@ public interface IAggregate<out TState, TId>
/// <param name="e">The event to apply.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task ApplyAsync(object e);

/// <summary>
/// Loads the aggregate's state from the event stream using the specified identifier.
/// </summary>
/// <param name="id">The identifier of the aggregate to load.</param>
/// <returns>A task representing the asynchronous operation.</returns>
internal Task LoadAsync(TId id);

/// <summary>
/// Saves the current state of the aggregate, committing any uncommitted events.
/// </summary>
/// <returns>A task representing the asynchronous operation.</returns>
internal Task SaveAsync();
}
9 changes: 1 addition & 8 deletions src/StreamWave/IAggregateBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface IAggregateBuilder<TState, TId>
/// <typeparam name="TEvent">The type of the event to be applied.</typeparam>
/// <param name="applier">A function that applies the specific event type to the state.</param>
/// <returns>The current instance of <see cref="IAggregateBuilder{TState, TId}"/>.</returns>
IAggregateBuilder<TState, TId> WithApplier<TEvent>(Func<TState, TEvent, Task<TState>> applier)
IAggregateBuilder<TState, TId> WithApplier<TEvent>(Func<TState, TEvent, TState> applier)
where TEvent : notnull;

/// <summary>
Expand All @@ -58,11 +58,4 @@ IAggregateBuilder<TState, TId> WithApplier<TEvent>(Func<TState, TEvent, Task<TSt
/// <param name="message">The validation message to be returned if the rule fails.</param>
/// <returns>The current instance of <see cref="IAggregateBuilder{TState, TId}"/>.</returns>
IAggregateBuilder<TState, TId> WithValidator(Func<TState, bool> rule, string message);

/// <summary>
///
/// </summary>
/// <param name="serviceProvider"></param>
/// <returns></returns>
IAggregate<TState, TId> Build(IServiceProvider serviceProvider);
}
Loading