Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate events streaming #160

Merged
merged 18 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Source/Aggregates/ActivityExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ namespace Dolittle.SDK.Aggregates;
public static class ActivityExtensions
{
const string AggregateRootId = "aggregate.id";



/// <summary>
/// Set parent execution context if spanId is present
/// </summary>
Expand Down
83 changes: 59 additions & 24 deletions Source/Aggregates/AggregateRoot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dolittle.SDK.Artifacts;
using Dolittle.SDK.Events;
using Dolittle.SDK.Events.Store;
Expand All @@ -13,26 +14,32 @@ namespace Dolittle.SDK.Aggregates;
/// <summary>
/// Represents the aggregate root.
/// </summary>
public class AggregateRoot
public abstract class AggregateRoot
{
readonly IList<AppliedEvent> _appliedEvents;
readonly List<AppliedEvent> _appliedEvents = new();

/// <summary>
/// Initializes a new instance of the <see cref="AggregateRoot"/> class.
/// </summary>
/// <param name="eventSourceId">The <see cref="Events.EventSourceId" />.</param>
public AggregateRoot(EventSourceId eventSourceId)
protected AggregateRoot(EventSourceId eventSourceId)
{
EventSourceId = eventSourceId;
AggregateRootId = this.GetAggregateRootId();
Version = AggregateRootVersion.Initial;
_appliedEvents = new List<AppliedEvent>();
IsStateless = this.IsStateless();
}

/// <summary>
/// Gets the current <see cref="AggregateRootVersion" />.
/// </summary>
public AggregateRootVersion Version { get; private set; }

/// <summary>
/// Gets the <see cref="Events.AggregateRootId"/>.
/// </summary>
public AggregateRootId AggregateRootId { get; }

/// <summary>
/// Gets the <see cref="Events.EventSourceId" /> that the <see cref="AggregateRoot" /> applies events to.
/// </summary>
Expand All @@ -42,6 +49,11 @@ public AggregateRoot(EventSourceId eventSourceId)
/// Gets the <see cref="IEnumerable{T}" /> of applied events to commit.
/// </summary>
public IEnumerable<AppliedEvent> AppliedEvents => _appliedEvents;

/// <summary>
/// Gets a value indicating whether this aggregate root is stateless or not.
/// </summary>
public bool IsStateless { get; }
woksin marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Apply the event to the <see cref="AggregateRoot" /> so that it will be committed to the <see cref="IEventStore" />
Expand Down Expand Up @@ -124,25 +136,47 @@ public void ApplyPublic(object @event, EventType eventType)
=> Apply(@event, eventType, true);

/// <summary>
/// Re-apply events from the Event Store.
/// Rehydrates the aggregate root with the <see cref="CommittedAggregateEvents"/> for this aggregate.
/// </summary>
/// <param name="events">Sequence that contains the events to re-apply.</param>
public virtual void ReApply(CommittedAggregateEvents events)
/// <param name="aggregateRootId">The aggregate root id.</param>
/// <param name="batches">The <see cref="IAsyncEnumerator{T}"/> batches of <see cref="CommittedAggregateEvents"/> to rehydrate with.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used for cancelling the rehydration.</param>
public async Task Rehydrate(IAsyncEnumerable<CommittedAggregateEvents> batches, CancellationToken cancellationToken)
{
ThrowIfEventWasAppliedToOtherEventSource(events);
ThrowIfEventWasAppliedByOtherAggregateRoot(events);

foreach (var @event in events)
var batchesEnumerator = batches.WithCancellation(cancellationToken).GetAsyncEnumerator();
woksin marked this conversation as resolved.
Show resolved Hide resolved
if (!await batchesEnumerator.MoveNextAsync())
{
throw new NoCommittedAggregateEventsBatches(AggregateRootId, EventSourceId);
}

do
{
ThrowIfAggreggateRootVersionIsOutOfOrder(@event);
Version++;
if (!this.IsStateless()) InvokeOnMethod(@event.Content);
var batch = batchesEnumerator.Current;
ThrowIfEventWasAppliedToOtherEventSource(batch);
ThrowIfEventWasAppliedByOtherAggregateRoot(batch);
if (IsStateless)
{
Version = batch.AggregateRootVersion;
break;
}
foreach (var @event in batch)
{
Version = @event.AggregateRootVersion + 1;
woksin marked this conversation as resolved.
Show resolved Hide resolved
InvokeOnMethod(@event.Content);
}
}
while (await batchesEnumerator.MoveNextAsync());
cancellationToken.ThrowIfCancellationRequested();
}

void Apply(object @event, EventType eventType, bool isPublic)

void Apply(object @event, EventType? eventType, bool isPublic)
{
if (@event == null) throw new EventContentCannotBeNull();
if (@event == null)
{
throw new EventContentCannotBeNull();
}

_appliedEvents.Add(new AppliedEvent(@event, eventType, isPublic));
Version++;
InvokeOnMethod(@event);
Expand All @@ -156,19 +190,20 @@ void InvokeOnMethod(object @event)
}
}

void ThrowIfAggreggateRootVersionIsOutOfOrder(CommittedAggregateEvent @event)
{
if (@event.AggregateRootVersion != Version) throw new AggregateRootVersionIsOutOfOrder(@event.AggregateRootVersion, Version);
}

void ThrowIfEventWasAppliedByOtherAggregateRoot(CommittedAggregateEvents events)
{
var aggregateRootId = this.GetAggregateRootId();
if (events.AggregateRoot != this.GetAggregateRootId()) throw new EventWasAppliedByOtherAggregateRoot(events.AggregateRoot, aggregateRootId);
if (events.AggregateRoot != this.GetAggregateRootId())
{
throw new EventWasAppliedByOtherAggregateRoot(events.AggregateRoot, aggregateRootId);
}
}

void ThrowIfEventWasAppliedToOtherEventSource(CommittedAggregateEvents events)
{
if (events.EventSource != EventSourceId) throw new EventWasAppliedToOtherEventSource(events.EventSource, EventSourceId);
if (events.EventSource != EventSourceId)
{
throw new EventWasAppliedToOtherEventSource(events.EventSource, EventSourceId);
}
}
}
}
16 changes: 14 additions & 2 deletions Source/Aggregates/AggregateRootExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ public static bool TryGetOnMethod(this AggregateRoot aggregateRoot, object @even
public static bool IsStateless(this AggregateRoot aggregateRoot)
=> GetHandleMethodsFor(aggregateRoot.GetType()).Count == 0;

/// <summary>
/// Gets all the <see cref="IEnumerable{T}"/> of <see cref="EventType"/> that the
/// </summary>
/// <param name="aggregateRoot"></param>
/// <param name="eventTypes"></param>
/// <returns></returns>
public static IEnumerable<EventType> GetEventTypes(this AggregateRoot aggregateRoot, IEventTypes eventTypes)
=> GetHandleMethodsFor(aggregateRoot.GetType()).Keys.Select(eventTypes.GetFor);

/// <summary>
/// Gets the <see cref="AggregateRootId" /> of an <see cref="AggregateRoot" />.
/// </summary>
Expand All @@ -45,7 +54,10 @@ public static AggregateRootId GetAggregateRootId(this AggregateRoot aggregateRoo
{
var aggregateRootType = aggregateRoot.GetType();
var aggregateRootAttribute = aggregateRootType.GetCustomAttribute<AggregateRootAttribute>();
if (aggregateRootAttribute == null) throw new MissingAggregateRootAttribute(aggregateRootType);
if (aggregateRootAttribute == null)
{
throw new MissingAggregateRootAttribute(aggregateRootType);
}
return aggregateRootAttribute.Type.Id;
}

Expand Down Expand Up @@ -73,4 +85,4 @@ static AggregateRootHandleMethods()
}
}
}
}
}
23 changes: 7 additions & 16 deletions Source/Aggregates/AggregateRootOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ public async Task Perform(Func<TAggregate, Task> method, CancellationToken cance

var aggregateRootId = aggregateRoot.GetAggregateRootId();
activity?.Tag(aggregateRootId);
await ReApplyEvents(aggregateRoot, aggregateRootId, cancellationToken).ConfigureAwait(false);

await Rehydrate(aggregateRoot, aggregateRootId, cancellationToken).ConfigureAwait(false);
_logger.PerformingOn(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.EventSourceId);
await method(aggregateRoot).ConfigureAwait(false);

if (aggregateRoot.AppliedEvents.Any())
{
await CommitAppliedEvents(aggregateRoot, aggregateRootId).ConfigureAwait(false);
Expand All @@ -95,21 +93,14 @@ bool TryGetAggregateRoot(EventSourceId eventSourceId, out TAggregate aggregateRo
return getAggregateRoot.Success;
}

async Task ReApplyEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId, CancellationToken cancellationToken)
Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, CancellationToken cancellationToken)
{
var eventSourceId = aggregateRoot.EventSourceId;
_logger.ReApplyingEventsFor(typeof(TAggregate), aggregateRootId, eventSourceId);

var committedEvents = await _eventStore.FetchForAggregate(aggregateRootId, eventSourceId, cancellationToken).ConfigureAwait(false);
if (committedEvents.HasEvents)
{
_logger.ReApplying(committedEvents.Count);
aggregateRoot.ReApply(committedEvents);
}
else
{
_logger.NoEventsToReApply();
}
_logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId);
var eventTypesToFetch = aggregateRoot.GetEventTypes(_eventTypes);

var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.Rehydrate(committedEventsBatches, cancellationToken);
}

Task<CommittedAggregateEvents> CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId)
Expand Down
4 changes: 2 additions & 2 deletions Source/Aggregates/AppliedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class AppliedEvent
/// <param name="event">The event content.</param>
/// <param name="eventType">The <see cref="EventType" />.</param>
/// <param name="isPublic">Whether the event is public or not.</param>
public AppliedEvent(object @event, EventType eventType, bool isPublic)
public AppliedEvent(object @event, EventType? eventType, bool isPublic)
{
Event = @event;
EventType = eventType;
Expand All @@ -31,7 +31,7 @@ public AppliedEvent(object @event, EventType eventType, bool isPublic)
/// <summary>
/// Gets the event's <see cref="EventType" />.
/// </summary>
public EventType EventType { get; }
public EventType? EventType { get; }

/// <summary>
/// Gets a value indicating whether this event is public or not.
Expand Down
4 changes: 2 additions & 2 deletions Source/Aggregates/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ static partial class Log
[LoggerMessage(0, LogLevel.Debug,"Performing operation on {AggregateRoot} with aggregate root id {AggregateRootId} applying events to event source {EventSource}")]
internal static partial void PerformingOn(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);

[LoggerMessage(0, LogLevel.Debug, "Re-applying events for {AggregateRoot} with aggregate root id {AggregateRootId} with event source id {EventSource}")]
internal static partial void ReApplyingEventsFor(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);
[LoggerMessage(0, LogLevel.Debug, "Rehydrating {AggregateRoot} with aggregate root id {AggregateRootId} with event source id {EventSource}")]
internal static partial void RehydratingAggregateRoot(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);

[LoggerMessage(0, LogLevel.Trace, "Re-applying {NumberOfEvents} events")]
internal static partial void ReApplying(this ILogger logger, int numberOfEvents);
Expand Down
23 changes: 23 additions & 0 deletions Source/Aggregates/NoCommittedAggregateEventsBatches.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.SDK.Events;

namespace Dolittle.SDK.Aggregates;

/// <summary>
/// Exception that gets thrown when the fetched aggregate root event stream has no batches.
/// </summary>
public class NoCommittedAggregateEventsBatches : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="NoCommittedAggregateEventsBatches"/> class.
/// </summary>
/// <param name="aggregateRootId">The aggregate root id.</param>
/// <param name="eventSourceId">The event source id.</param>
public NoCommittedAggregateEventsBatches(AggregateRootId aggregateRootId, EventSourceId eventSourceId)
: base($"No batches of committed aggregate events were received when fetching for aggregate root '{aggregateRootId}' with event source '{eventSourceId}'")
{
}
}
8 changes: 4 additions & 4 deletions Source/Events/Store/AggregaterootVersionIsOutOfOrder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public class AggregateRootVersionIsOutOfOrder : ArgumentException
/// Initializes a new instance of the <see cref="AggregateRootVersionIsOutOfOrder"/> class.
/// </summary>
/// <param name="eventVersion">The <see cref="AggregateRootVersion"/> the Event was applied by.</param>
/// <param name="expectedVersion">Expected <see cref="AggregateRootVersion"/>.</param>
public AggregateRootVersionIsOutOfOrder(AggregateRootVersion eventVersion, AggregateRootVersion expectedVersion)
: base($"Aggregate Root version is out of order. Version '{eventVersion}' from event does not match '{expectedVersion}'")
/// <param name="previousVersion">Previous <see cref="AggregateRootVersion"/>.</param>
public AggregateRootVersionIsOutOfOrder(AggregateRootVersion eventVersion, AggregateRootVersion previousVersion)
: base($"Aggregate Root version is out of order. Version '{eventVersion}' from event is not greater than previous version '{previousVersion}'")
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public Task<CommittedAggregateEvents> Commit(
Action<UncommittedAggregateEventsBuilder> callback,
CancellationToken cancellationToken = default)
{
if (_builder != default) throw new EventBuilderMethodAlreadyCalled("Commit");
if (_builder != default)
{
throw new EventBuilderMethodAlreadyCalled("Commit");
}
_builder = new UncommittedAggregateEventsBuilder(_aggregateRootId, _eventSourceId, _expectedVersion);
callback(_builder);
var uncommittedAggregateEvents = _builder.Build(_eventTypes);
Expand Down
Loading