Skip to content

Commit

Permalink
Projections rebuild - all tests ok
Browse files Browse the repository at this point in the history
  • Loading branch information
ustims committed Aug 13, 2023
1 parent 7eecbbe commit 8bb28db
Show file tree
Hide file tree
Showing 39 changed files with 844 additions and 1,150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using CloudFabric.Projections;
using CloudFabric.Projections.InMemory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace CloudFabric.EventSourcing.AspNet.InMemory.Extensions
{
Expand All @@ -25,7 +27,9 @@ public static IEventSourcingBuilder AddInMemoryEventStore(
eventStore.Initialize().Wait();
// add events observer for projections
var eventStoreObserver = new InMemoryEventStoreEventObserver(eventStore);
var eventStoreObserver = new InMemoryEventStoreEventObserver(
eventStore, sp.GetRequiredService<ILogger<InMemoryEventStoreEventObserver>>()
);
var projectionsRepositoryFactory = sp.GetService<ProjectionRepositoryFactory>();
Expand Down Expand Up @@ -75,7 +79,11 @@ params Type[] projectionBuildersTypes
{
builder.ProjectionBuilderTypes = projectionBuildersTypes;

builder.Services.AddScoped<ProjectionRepositoryFactory>((sp) => new InMemoryProjectionRepositoryFactory());
builder.Services.AddScoped<ProjectionRepositoryFactory>((sp) =>
{
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
return new InMemoryProjectionRepositoryFactory(loggerFactory);
});

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace CloudFabric.EventSourcing.AspNet.Postgresql.Extensions
class PostgresqlEventSourcingScope
{
public IEventStore EventStore { get; set; }
public IEventsObserver EventsObserver { get; set; }
public EventsObserver EventsObserver { get; set; }
public ProjectionsEngine? ProjectionsEngine { get; set; }
}

Expand Down Expand Up @@ -91,7 +91,7 @@ Func<IServiceProvider, IPostgresqlEventStoreConnectionInformationProvider> conne
}
);

services.AddScoped<IEventsObserver>(
services.AddScoped<EventsObserver>(
(sp) =>
{
var eventSourcingScope = sp.GetRequiredService<PostgresqlEventSourcingScope>();
Expand Down Expand Up @@ -129,9 +129,11 @@ params Type[] projectionBuildersTypes
builder.Services.AddScoped<ProjectionRepositoryFactory>(
(sp) =>
{
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
var connectionInformationProvider = sp.GetRequiredService<IPostgresqlEventStoreConnectionInformationProvider>();
return new PostgresqlProjectionRepositoryFactory(
loggerFactory,
connectionInformationProvider.GetConnectionInformation().ConnectionId,
projectionsConnectionString
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class EventSourcingBuilder : IEventSourcingBuilder
public string? ProjectionsConnectionString { get; set; }
public Type[]? ProjectionBuilderTypes { get; set; }

public IEventsObserver ProjectionEventsObserver { get; set; }
public EventsObserver ProjectionEventsObserver { get; set; }

public dynamic ConstructProjectionBuilder(Type projectionBuilderType, ProjectionRepositoryFactory projectionsRepositoryFactory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IEventSourcingBuilder

IServiceCollection Services { get; set; }

IEventsObserver ProjectionEventsObserver { get; set; }
EventsObserver ProjectionEventsObserver { get; set; }

ProjectionsEngine ProjectionsEngine { get; set; }
string ProjectionsConnectionString { get; set; }
Expand Down
9 changes: 9 additions & 0 deletions CloudFabric.EventSourcing.EventStore/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ public interface IEventStore

Task<EventStream> LoadStreamAsync(Guid streamId, string partitionKey, int fromVersion, CancellationToken cancellationToken = default);

Task<List<IEvent>> LoadEventsAsync(
string? partitionKey,
DateTime? dateFrom = null,
int limit = 250,
CancellationToken cancellationToken = default
);

Task<bool> AppendToStreamAsync(
EventUserInfo eventUserInfo,
Guid streamId,
Expand All @@ -20,6 +27,8 @@ Task<bool> AppendToStreamAsync(

Task Initialize(CancellationToken cancellationToken = default);

Task<EventStoreStatistics> GetStatistics(CancellationToken cancellationToken = default);

Task DeleteAll(CancellationToken cancellationToken = default);

Task<bool> HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default);
Expand Down
25 changes: 3 additions & 22 deletions CloudFabric.EventSourcing.Tests/DynamicProjectionSchemaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public abstract class DynamicProjectionSchemaTests

protected abstract ProjectionRepositoryFactory GetProjectionRepositoryFactory();

protected abstract IEventsObserver GetEventStoreEventsObserver();
protected abstract EventsObserver GetEventStoreEventsObserver();

private const string _projectionsSchemaName = "orders-projections";

Expand All @@ -154,7 +154,7 @@ public async Task Cleanup()
}
}

private async Task<(ProjectionsEngine, IProjectionRepository)> PrepareProjections(IEventsObserver eventsObserver, ProjectionDocumentSchema schema)
private async Task<(ProjectionsEngine, IProjectionRepository)> PrepareProjections(EventsObserver eventsObserver, ProjectionDocumentSchema schema)
{
// Repository containing projections - `view models` of orders
var ordersListProjectionsRepository = GetProjectionRepositoryFactory()
Expand All @@ -175,7 +175,7 @@ public async Task Cleanup()
return (projectionsEngine, ordersListProjectionsRepository);
}

private ProjectionsRebuildProcessor PrepareProjectionsRebuildProcessor(IEventsObserver eventsObserver, ProjectionDocumentSchema projectionDocumentSchema)
private ProjectionsRebuildProcessor PrepareProjectionsRebuildProcessor(EventsObserver eventsObserver, ProjectionDocumentSchema projectionDocumentSchema)
{
return new ProjectionsRebuildProcessor(
GetProjectionRepositoryFactory().GetProjectionsIndexStateRepository(),
Expand Down Expand Up @@ -541,25 +541,6 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj
orderProjectionWithNewSchemaTotalPrice["Id"].Should().Be(order.Id);
orderProjectionWithNewSchemaTotalPrice["ItemsCount"].Should().Be(5);

// Important! After we added a new projection field it's required
// to re-run all projection builders from the first event (rebuild all projections)
// Since we didn't rebuild projections, new field will only have data for events that happened after the field was added
// Hence total price is nly the price of last added item.
//orderProjectionWithNewSchemaTotalPrice["TotalPrice"].Should().Be(6.95m);

// var query = new ProjectionQuery();
// query.Filters = new List<Filter>()
// {
// new Filter("TotalPrice", FilterOperator.Greater, 6m)
// };
//
// var searchResult = await TestHelpers.RepeatUntil(
// () => ordersListProjectionsRepository.Query(query),
// (r) => r.Records.Count == 1,
// ProjectionsUpdateDelay
// );
// searchResult.Records.Count.Should().Be(1);

await projectionsRebuildProcessor.RebuildProjectionsThatRequireRebuild();

var orderProjectionWithNewSchemaTotalPriceAfterRebuild = await ordersListProjectionsRepository
Expand Down
18 changes: 1 addition & 17 deletions CloudFabric.EventSourcing.Tests/OrderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,6 @@ public async Task TestRebuildAllOrdersProjections()
firstOrderProjection.Should().BeNull();
secondOrderProjection.Should().BeNull();

// rebuild the firstOrder document
//await ProjectionsEngine.StartRebuildAsync(instanceName, PartitionKeys.GetOrderPartitionKey());

// wait for the rebuild state to be indexed
// await Task.Delay(ProjectionsUpdateDelay);
//
// // wait for the rebuild to finish
// ProjectionRebuildState rebuildState;
// do
// {
// rebuildState = await ProjectionsEngine.GetRebuildState(instanceName, PartitionKeys.GetOrderPartitionKey());
// await Task.Delay(10);
// } while (rebuildState.Status != RebuildStatus.Completed && rebuildState.Status != RebuildStatus.Failed);
//
// rebuildState.Status.Should().Be(RebuildStatus.Completed);

await ProjectionsRepository.DeleteAll();
await ProjectionsRepository.EnsureIndex();
await ProjectionsRebuildProcessor.RebuildProjectionsThatRequireRebuild();
Expand Down Expand Up @@ -713,4 +697,4 @@ public async Task TestHardDeleteOrder()
var order2 = await orderRepository.LoadAsync(id, PartitionKeys.GetOrderPartitionKey());
order2.Should().BeNull();
}
}
}
16 changes: 11 additions & 5 deletions CloudFabric.EventSourcing.Tests/TestsBaseWithProjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public abstract class TestsBaseWithProjections<TProjectionDocument, TProjectionB
protected TimeSpan ProjectionsUpdateDelay { get; set; } = TimeSpan.FromMilliseconds(1000);
protected abstract Task<IEventStore> GetEventStore();
protected abstract ProjectionRepositoryFactory GetProjectionRepositoryFactory();
protected abstract IEventsObserver GetEventStoreEventsObserver();
protected abstract EventsObserver GetEventStoreEventsObserver();

protected ProjectionsEngine ProjectionsEngine;
protected IProjectionRepository<TProjectionDocument> ProjectionsRepository;
Expand Down Expand Up @@ -83,10 +83,16 @@ public async Task Initialize()
[TestCleanup]
public async Task Cleanup()
{
await ProjectionsEngine.StopAsync();

var store = await GetEventStore();
await store.DeleteAll();
try
{
await ProjectionsEngine.StopAsync();

var store = await GetEventStore();
await store.DeleteAll();
}
catch
{
}

try
{
Expand Down
12 changes: 6 additions & 6 deletions CloudFabric.Projections.Worker/ProjectionsRebuildProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,18 @@ public async Task<bool> RebuildOneProjectionWhichRequiresRebuild(

indexToRebuild.TotalEventsToProcess = eventStoreStatistics.TotalEventsCount;

await _projectionRepository.UpdateProjectionRebuildStats(projectionIndexState);
await _projectionRepository.SaveProjectionIndexState(projectionIndexState);

await projectionsEngine.ReplayEventsAsync(
$"{Environment.MachineName}-{Environment.ProcessId}", null, indexToRebuild.LastProcessedEventTimestamp,
250,
async Task(IEvent lastProcessedEvent) =>
async Task(int eventsProcessed, IEvent lastProcessedEvent) =>
{
indexToRebuild.RebuildEventsProcessed += 250;
indexToRebuild.RebuildEventsProcessed += eventsProcessed;
indexToRebuild.LastProcessedEventTimestamp = lastProcessedEvent.Timestamp;
indexToRebuild.RebuildHealthCheckAt = DateTime.UtcNow;
await _projectionRepository.UpdateProjectionRebuildStats(projectionIndexState);
await _projectionRepository.SaveProjectionIndexState(projectionIndexState);
_logger.LogInformation("Processed {EventsProcessed}/{TotalEventsInEventStore}",
indexToRebuild.RebuildEventsProcessed, indexToRebuild.TotalEventsToProcess
Expand All @@ -91,9 +91,9 @@ async Task(IEvent lastProcessedEvent) =>
indexToRebuild.RebuildHealthCheckAt = DateTime.UtcNow;
indexToRebuild.RebuildCompletedAt = DateTime.UtcNow;

await _projectionRepository.UpdateProjectionRebuildStats(projectionIndexState);
await _projectionRepository.SaveProjectionIndexState(projectionIndexState);
}

return true;
}
}
}
127 changes: 127 additions & 0 deletions CloudFabric.Projections/EventsObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
using CloudFabric.EventSourcing.EventStore;
using Microsoft.Extensions.Logging;

namespace CloudFabric.Projections;

public abstract class EventsObserver
{
protected Func<IEvent, Task>? _eventHandler;
protected readonly ILogger<EventsObserver> _logger;
protected readonly IEventStore _eventStore;

protected EventsObserver(IEventStore eventStore, ILogger<EventsObserver> logger)
{
_eventStore = eventStore;
_logger = logger;
}

public void SetEventHandler(Func<IEvent, Task> eventHandler)
{
_eventHandler = eventHandler;
}

public abstract Task StartAsync(string instanceName);

public abstract Task StopAsync();

public virtual async Task ReplayEventsForOneDocumentAsync(Guid documentId, string partitionKey)
{
var stream = await _eventStore.LoadStreamAsync(documentId, partitionKey);

foreach (var @event in stream.Events)
{
await EventStoreOnEventAdded(@event);
}
}

public virtual Task<EventStoreStatistics> GetEventStoreStatistics()
{
return _eventStore.GetStatistics();
}

/// <summary>
/// Reads all events and runs event handlers on them (basically, "replays" those events). Needed for projections (materialized views)
/// rebuild.
/// </summary>
/// <param name="instanceName">Used for tracking purposes. You can pass machineName or processId here.</param>
/// <param name="partitionKey">PartitionKey to filter all events by.</param>
/// <param name="dateFrom">Skip events which happened prior to this date.</param>
/// <param name="chunkSize">How many events to load at a time.</param>
/// <param name="chunkProcessedCallback">Function that will be called after each chunk of `chunkSize` is processed. Arguments: number of processed events (can be lower than chunkSize), last processed event</param>
/// <param name="cancellationToken">This is a long-running operation, so make sure to pass correct CancellationToken here.</param>
/// <returns></returns>
public virtual async Task ReplayEventsAsync(
string instanceName,
string? partitionKey,
DateTime? dateFrom,
int chunkSize = 250,
Func<int, IEvent, Task>? chunkProcessedCallback = null,
CancellationToken cancellationToken = default
) {
_logger.LogInformation("Replaying events {InstanceName} from {DateFrom}",
instanceName,
dateFrom
);

var lastEventDateTime = dateFrom;
var totalEventsProcessed = 0;

while (true)
{
var chunk = await _eventStore.LoadEventsAsync(
partitionKey,
lastEventDateTime,
chunkSize,
cancellationToken
);

if (chunk.Count <= 0)
{
break;
}

foreach (var @event in chunk)
{
await EventStoreOnEventAdded(@event);
}

var lastEvent = chunk.Last();
lastEventDateTime = lastEvent.Timestamp;
totalEventsProcessed += chunk.Count;

_logger.LogInformation(
"Replayed {ReplayedEventsCount} {InstanceName}, " +
"last event timestamp: {LastEventDateTime}",
chunk.Count,
instanceName,
lastEvent.Timestamp
);

if (chunkProcessedCallback != null)
{
await chunkProcessedCallback(chunk.Count, lastEvent);
}

if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Cancellation requested. Processed {TotalEventsProcessed} {InstanceName}",
totalEventsProcessed,
instanceName
);

break;
}
}
}

protected async Task EventStoreOnEventAdded(IEvent e)
{
if (_eventHandler == null)
{
throw new InvalidOperationException(
"Can't process an event: no eventHandler was set. Please call SetEventHandler before calling StartAsync.");
}

await _eventHandler(e);
}
}
Loading

0 comments on commit 8bb28db

Please sign in to comment.