Skip to content

Commit

Permalink
Merge pull request #160 from dolittle/aggregate-events-streaming
Browse files Browse the repository at this point in the history
Aggregate events streaming
  • Loading branch information
woksin authored Sep 15, 2022
2 parents 8a9c588 + 727bab4 commit 66d6c77
Show file tree
Hide file tree
Showing 34 changed files with 439 additions and 155 deletions.
3 changes: 1 addition & 2 deletions Source/Aggregates/ActivityExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ namespace Dolittle.SDK.Aggregates;
public static class ActivityExtensions
{
const string AggregateRootId = "aggregate.id";



/// <summary>
/// Set parent execution context if spanId is present
/// </summary>
Expand Down
77 changes: 53 additions & 24 deletions Source/Aggregates/AggregateRoot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dolittle.SDK.Artifacts;
using Dolittle.SDK.Events;
using Dolittle.SDK.Events.Store;
Expand All @@ -13,26 +14,32 @@ namespace Dolittle.SDK.Aggregates;
/// <summary>
/// Represents the aggregate root.
/// </summary>
public class AggregateRoot
public abstract class AggregateRoot
{
readonly IList<AppliedEvent> _appliedEvents;
readonly List<AppliedEvent> _appliedEvents = new();

/// <summary>
/// Initializes a new instance of the <see cref="AggregateRoot"/> class.
/// </summary>
/// <param name="eventSourceId">The <see cref="Events.EventSourceId" />.</param>
public AggregateRoot(EventSourceId eventSourceId)
protected AggregateRoot(EventSourceId eventSourceId)
{
EventSourceId = eventSourceId;
AggregateRootId = this.GetAggregateRootId();
Version = AggregateRootVersion.Initial;
_appliedEvents = new List<AppliedEvent>();
IsStateless = this.IsStateless();
}

/// <summary>
/// Gets the current <see cref="AggregateRootVersion" />.
/// </summary>
public AggregateRootVersion Version { get; private set; }

/// <summary>
/// Gets the <see cref="Events.AggregateRootId"/>.
/// </summary>
public AggregateRootId AggregateRootId { get; }

/// <summary>
/// Gets the <see cref="Events.EventSourceId" /> that the <see cref="AggregateRoot" /> applies events to.
/// </summary>
Expand All @@ -42,6 +49,8 @@ public AggregateRoot(EventSourceId eventSourceId)
/// Gets the <see cref="IEnumerable{T}" /> of applied events to commit.
/// </summary>
public IEnumerable<AppliedEvent> AppliedEvents => _appliedEvents;

bool IsStateless { get; }

/// <summary>
/// Apply the event to the <see cref="AggregateRoot" /> so that it will be committed to the <see cref="IEventStore" />
Expand Down Expand Up @@ -124,25 +133,44 @@ public void ApplyPublic(object @event, EventType eventType)
=> Apply(@event, eventType, true);

/// <summary>
/// Re-apply events from the Event Store.
/// Rehydrates the aggregate root with the <see cref="CommittedAggregateEvents"/> for this aggregate.
/// </summary>
/// <param name="events">Sequence that contains the events to re-apply.</param>
public virtual void ReApply(CommittedAggregateEvents events)
/// <param name="batches">The <see cref="IAsyncEnumerator{T}"/> batches of <see cref="CommittedAggregateEvents"/> to rehydrate with.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used for cancelling the rehydration.</param>
public async Task Rehydrate(IAsyncEnumerable<CommittedAggregateEvents> batches, CancellationToken cancellationToken)
{
ThrowIfEventWasAppliedToOtherEventSource(events);
ThrowIfEventWasAppliedByOtherAggregateRoot(events);

foreach (var @event in events)
var hasBatches = false;
await foreach (var batch in batches.WithCancellation(cancellationToken))
{
ThrowIfAggreggateRootVersionIsOutOfOrder(@event);
Version++;
if (!this.IsStateless()) InvokeOnMethod(@event.Content);
hasBatches = true;
ThrowIfEventWasAppliedToOtherEventSource(batch);
ThrowIfEventWasAppliedByOtherAggregateRoot(batch);
if (IsStateless)
{
Version = batch.AggregateRootVersion;
break;
}
foreach (var @event in batch)
{
Version = @event.AggregateRootVersion + 1;
InvokeOnMethod(@event.Content);
}
}
if (!hasBatches)
{
throw new NoCommittedAggregateEventsBatches(AggregateRootId, EventSourceId);
}
cancellationToken.ThrowIfCancellationRequested();
}

void Apply(object @event, EventType eventType, bool isPublic)

void Apply(object @event, EventType? eventType, bool isPublic)
{
if (@event == null) throw new EventContentCannotBeNull();
if (@event == null)
{
throw new EventContentCannotBeNull();
}

_appliedEvents.Add(new AppliedEvent(@event, eventType, isPublic));
Version++;
InvokeOnMethod(@event);
Expand All @@ -156,19 +184,20 @@ void InvokeOnMethod(object @event)
}
}

void ThrowIfAggreggateRootVersionIsOutOfOrder(CommittedAggregateEvent @event)
{
if (@event.AggregateRootVersion != Version) throw new AggregateRootVersionIsOutOfOrder(@event.AggregateRootVersion, Version);
}

void ThrowIfEventWasAppliedByOtherAggregateRoot(CommittedAggregateEvents events)
{
var aggregateRootId = this.GetAggregateRootId();
if (events.AggregateRoot != this.GetAggregateRootId()) throw new EventWasAppliedByOtherAggregateRoot(events.AggregateRoot, aggregateRootId);
if (events.AggregateRoot != this.GetAggregateRootId())
{
throw new EventWasAppliedByOtherAggregateRoot(events.AggregateRoot, aggregateRootId);
}
}

void ThrowIfEventWasAppliedToOtherEventSource(CommittedAggregateEvents events)
{
if (events.EventSource != EventSourceId) throw new EventWasAppliedToOtherEventSource(events.EventSource, EventSourceId);
if (events.EventSource != EventSourceId)
{
throw new EventWasAppliedToOtherEventSource(events.EventSource, EventSourceId);
}
}
}
}
16 changes: 14 additions & 2 deletions Source/Aggregates/AggregateRootExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ public static bool TryGetOnMethod(this AggregateRoot aggregateRoot, object @even
public static bool IsStateless(this AggregateRoot aggregateRoot)
=> GetHandleMethodsFor(aggregateRoot.GetType()).Count == 0;

/// <summary>
/// Gets all the <see cref="IEnumerable{T}"/> of <see cref="EventType"/> that the
/// </summary>
/// <param name="aggregateRoot"></param>
/// <param name="eventTypes"></param>
/// <returns></returns>
public static IEnumerable<EventType> GetEventTypes(this AggregateRoot aggregateRoot, IEventTypes eventTypes)
=> GetHandleMethodsFor(aggregateRoot.GetType()).Keys.Select(eventTypes.GetFor);

/// <summary>
/// Gets the <see cref="AggregateRootId" /> of an <see cref="AggregateRoot" />.
/// </summary>
Expand All @@ -45,7 +54,10 @@ public static AggregateRootId GetAggregateRootId(this AggregateRoot aggregateRoo
{
var aggregateRootType = aggregateRoot.GetType();
var aggregateRootAttribute = aggregateRootType.GetCustomAttribute<AggregateRootAttribute>();
if (aggregateRootAttribute == null) throw new MissingAggregateRootAttribute(aggregateRootType);
if (aggregateRootAttribute == null)
{
throw new MissingAggregateRootAttribute(aggregateRootType);
}
return aggregateRootAttribute.Type.Id;
}

Expand Down Expand Up @@ -73,4 +85,4 @@ static AggregateRootHandleMethods()
}
}
}
}
}
23 changes: 7 additions & 16 deletions Source/Aggregates/AggregateRootOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ public async Task Perform(Func<TAggregate, Task> method, CancellationToken cance

var aggregateRootId = aggregateRoot.GetAggregateRootId();
activity?.Tag(aggregateRootId);
await ReApplyEvents(aggregateRoot, aggregateRootId, cancellationToken).ConfigureAwait(false);

await Rehydrate(aggregateRoot, aggregateRootId, cancellationToken).ConfigureAwait(false);
_logger.PerformingOn(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.EventSourceId);
await method(aggregateRoot).ConfigureAwait(false);

if (aggregateRoot.AppliedEvents.Any())
{
await CommitAppliedEvents(aggregateRoot, aggregateRootId).ConfigureAwait(false);
Expand All @@ -95,21 +93,14 @@ bool TryGetAggregateRoot(EventSourceId eventSourceId, out TAggregate aggregateRo
return getAggregateRoot.Success;
}

async Task ReApplyEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId, CancellationToken cancellationToken)
Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, CancellationToken cancellationToken)
{
var eventSourceId = aggregateRoot.EventSourceId;
_logger.ReApplyingEventsFor(typeof(TAggregate), aggregateRootId, eventSourceId);

var committedEvents = await _eventStore.FetchForAggregate(aggregateRootId, eventSourceId, cancellationToken).ConfigureAwait(false);
if (committedEvents.HasEvents)
{
_logger.ReApplying(committedEvents.Count);
aggregateRoot.ReApply(committedEvents);
}
else
{
_logger.NoEventsToReApply();
}
_logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId);
var eventTypesToFetch = aggregateRoot.GetEventTypes(_eventTypes);

var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.Rehydrate(committedEventsBatches, cancellationToken);
}

Task<CommittedAggregateEvents> CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId)
Expand Down
4 changes: 2 additions & 2 deletions Source/Aggregates/AppliedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class AppliedEvent
/// <param name="event">The event content.</param>
/// <param name="eventType">The <see cref="EventType" />.</param>
/// <param name="isPublic">Whether the event is public or not.</param>
public AppliedEvent(object @event, EventType eventType, bool isPublic)
public AppliedEvent(object @event, EventType? eventType, bool isPublic)
{
Event = @event;
EventType = eventType;
Expand All @@ -31,7 +31,7 @@ public AppliedEvent(object @event, EventType eventType, bool isPublic)
/// <summary>
/// Gets the event's <see cref="EventType" />.
/// </summary>
public EventType EventType { get; }
public EventType? EventType { get; }

/// <summary>
/// Gets a value indicating whether this event is public or not.
Expand Down
4 changes: 2 additions & 2 deletions Source/Aggregates/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ static partial class Log
[LoggerMessage(0, LogLevel.Debug,"Performing operation on {AggregateRoot} with aggregate root id {AggregateRootId} applying events to event source {EventSource}")]
internal static partial void PerformingOn(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);

[LoggerMessage(0, LogLevel.Debug, "Re-applying events for {AggregateRoot} with aggregate root id {AggregateRootId} with event source id {EventSource}")]
internal static partial void ReApplyingEventsFor(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);
[LoggerMessage(0, LogLevel.Debug, "Rehydrating {AggregateRoot} with aggregate root id {AggregateRootId} with event source id {EventSource}")]
internal static partial void RehydratingAggregateRoot(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);

[LoggerMessage(0, LogLevel.Trace, "Re-applying {NumberOfEvents} events")]
internal static partial void ReApplying(this ILogger logger, int numberOfEvents);
Expand Down
23 changes: 23 additions & 0 deletions Source/Aggregates/NoCommittedAggregateEventsBatches.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.SDK.Events;

namespace Dolittle.SDK.Aggregates;

/// <summary>
/// Exception that gets thrown when the fetched aggregate root event stream has no batches.
/// </summary>
public class NoCommittedAggregateEventsBatches : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="NoCommittedAggregateEventsBatches"/> class.
/// </summary>
/// <param name="aggregateRootId">The aggregate root id.</param>
/// <param name="eventSourceId">The event source id.</param>
public NoCommittedAggregateEventsBatches(AggregateRootId aggregateRootId, EventSourceId eventSourceId)
: base($"No batches of committed aggregate events were received when fetching for aggregate root '{aggregateRootId}' with event source '{eventSourceId}'")
{
}
}
8 changes: 4 additions & 4 deletions Source/Events/Store/AggregaterootVersionIsOutOfOrder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public class AggregateRootVersionIsOutOfOrder : ArgumentException
/// Initializes a new instance of the <see cref="AggregateRootVersionIsOutOfOrder"/> class.
/// </summary>
/// <param name="eventVersion">The <see cref="AggregateRootVersion"/> the Event was applied by.</param>
/// <param name="expectedVersion">Expected <see cref="AggregateRootVersion"/>.</param>
public AggregateRootVersionIsOutOfOrder(AggregateRootVersion eventVersion, AggregateRootVersion expectedVersion)
: base($"Aggregate Root version is out of order. Version '{eventVersion}' from event does not match '{expectedVersion}'")
/// <param name="previousVersion">Previous <see cref="AggregateRootVersion"/>.</param>
public AggregateRootVersionIsOutOfOrder(AggregateRootVersion eventVersion, AggregateRootVersion previousVersion)
: base($"Aggregate Root version is out of order. Version '{eventVersion}' from event is not greater than previous version '{previousVersion}'")
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public Task<CommittedAggregateEvents> Commit(
Action<UncommittedAggregateEventsBuilder> callback,
CancellationToken cancellationToken = default)
{
if (_builder != default) throw new EventBuilderMethodAlreadyCalled("Commit");
if (_builder != default)
{
throw new EventBuilderMethodAlreadyCalled("Commit");
}
_builder = new UncommittedAggregateEventsBuilder(_aggregateRootId, _eventSourceId, _expectedVersion);
callback(_builder);
var uncommittedAggregateEvents = _builder.Build(_eventTypes);
Expand Down
Loading

0 comments on commit 66d6c77

Please sign in to comment.