Skip to content

Commit

Permalink
Merge pull request #66 from Tech-Fabric/feature-projections-rebuild
Browse files Browse the repository at this point in the history
Feature projections rebuild
  • Loading branch information
ustims committed Aug 29, 2023
2 parents 2e51581 + e730d43 commit 0134e49
Show file tree
Hide file tree
Showing 70 changed files with 9,303 additions and 495 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches: [ "main" ]

env:
PACKAGE_VERSION: 0.1.6
PACKAGE_VERSION: 0.2.0

jobs:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,26 @@ params Type[] projectionBuildersTypes
// projectionsConnectionInfo.ContainerId
// );

var projectionsEngine = new ProjectionsEngine();

if (builder.ProjectionEventsObserver == null)
{
throw new ArgumentException("Projection events observer is missing");
}

projectionsEngine.SetEventsObserver(builder.ProjectionEventsObserver);

foreach (var projectionBuilderType in projectionBuildersTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);

projectionsEngine.AddProjectionBuilder(projectionBuilder);
}

builder.ProjectionsEngine = projectionsEngine;
// TODO: this needs refactoring to scoped (see postgresql example)
// var projectionsEngine = new ProjectionsEngine();
//
// if (builder.ProjectionEventsObserver == null)
// {
// throw new ArgumentException("Projection events observer is missing");
// }
//
// projectionsEngine.SetEventsObserver(builder.ProjectionEventsObserver);
//
// foreach (var projectionBuilderType in projectionBuildersTypes)
// {
// var projectionBuilder = builder.ConstructProjectionBuilder(
// projectionBuilderType,
// projectionsRepositoryFactory, new AggregateRepositoryFactory(builder.EventStore), serviceProvider, ProjectionOperationIndexSelector.Write);
//
// projectionsEngine.AddProjectionBuilder(projectionBuilder);
// }

//builder.ProjectionsEngine = projectionsEngine;

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ public static IEventSourcingBuilder AddInMemoryEventStore(
foreach (var projectionBuilderType in builder.ProjectionBuilderTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);
var projectionBuilder = builder.ConstructProjectionBuilder(
projectionBuilderType,
projectionsRepositoryFactory,
new AggregateRepositoryFactory(eventStore),
sp,
ProjectionOperationIndexSelector.Write
);
projectionsEngine.AddProjectionBuilder(projectionBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,10 @@ public static IEventSourcingBuilder AddPostgresqlEventStore(
string itemsTableName
)
{
services.AddPostgresqlEventStore((sp) =>
new PostgresqlEventStoreStaticConnectionInformationProvider(eventsConnectionString, eventsTableName, itemsTableName)
return services.AddPostgresqlEventStore(
(sp) =>
new PostgresqlEventStoreStaticConnectionInformationProvider(eventsConnectionString, eventsTableName, itemsTableName)
);

return new EventSourcingBuilder
{
Services = services
};
}

public static IEventSourcingBuilder AddPostgresqlEventStore(
Expand Down Expand Up @@ -74,11 +70,20 @@ Func<IServiceProvider, IPostgresqlEventStoreConnectionInformationProvider> conne
scope.ProjectionsEngine = new ProjectionsEngine();
scope.ProjectionsEngine.SetEventsObserver(scope.EventsObserver);
foreach (var projectionBuilderType in builder.ProjectionBuilderTypes)
if (builder.ProjectionBuilderTypes != null)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);
scope.ProjectionsEngine.AddProjectionBuilder(projectionBuilder);
foreach (var projectionBuilderType in builder.ProjectionBuilderTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(
projectionBuilderType,
projectionsRepositoryFactory,
new AggregateRepositoryFactory(scope.EventStore),
sp,
ProjectionOperationIndexSelector.Write
);
scope.ProjectionsEngine.AddProjectionBuilder(projectionBuilder);
}
}
scope.ProjectionsEngine.StartAsync(connectionInformationProvider.GetConnectionInformation().ConnectionId).GetAwaiter().GetResult();
Expand Down Expand Up @@ -108,6 +113,15 @@ Func<IServiceProvider, IPostgresqlEventStoreConnectionInformationProvider> conne
}
);

services.AddScoped<AggregateRepositoryFactory>(
(sp) =>
{
var eventSourcingScope = sp.GetRequiredService<PostgresqlEventSourcingScope>();
return new AggregateRepositoryFactory(eventSourcingScope.EventStore);
}
);

services.AddScoped<IStoreRepository>(
(sp) =>
{
Expand Down Expand Up @@ -167,8 +181,8 @@ params Type[] projectionBuildersTypes
return new PostgresqlProjectionRepositoryFactory(
loggerFactory,
connectionInformationProvider.GetConnectionInformation().ConnectionId,
projectionsConnectionString
connectionInformationProvider.GetConnectionInformation().ConnectionString,
connectionInformationProvider.GetConnectionInformation().ConnectionId
);
}
);
Expand All @@ -178,48 +192,57 @@ params Type[] projectionBuildersTypes

public static IEventSourcingBuilder AddProjectionsRebuildProcessor(this IEventSourcingBuilder builder)
{
builder.Services.AddSingleton<ProjectionsRebuildProcessor>((sp) => {
return new ProjectionsRebuildProcessor(
sp.GetRequiredService<ProjectionRepositoryFactory>().GetProjectionRepository(null),
async (string connectionId) =>
{
var connectionInformationProvider = sp.GetRequiredService<IPostgresqlEventStoreConnectionInformationProvider>();
var connectionInformation = connectionInformationProvider.GetConnectionInformation(connectionId);
var eventStore = new PostgresqlEventStore(
connectionInformation.ConnectionString, connectionInformation.TableName, connectionInformation.ItemsTableName
);
var eventObserver = new PostgresqlEventStoreEventObserver(
(PostgresqlEventStore)eventStore,
sp.GetRequiredService<ILogger<PostgresqlEventStoreEventObserver>>()
);
var projectionsEngine = new ProjectionsEngine();
foreach (var projectionBuilderType in builder.ProjectionBuilderTypes)
builder.Services.AddSingleton<ProjectionsRebuildProcessor>(
(sp) =>
{
var rebuildProcessorScope = sp.CreateScope();
return new ProjectionsRebuildProcessor(
rebuildProcessorScope.ServiceProvider.GetRequiredService<ProjectionRepositoryFactory>().GetProjectionsIndexStateRepository(),
async (string connectionId) =>
{
var projectionBuilder = builder.ConstructProjectionBuilder(
projectionBuilderType,
sp.GetRequiredService<ProjectionRepositoryFactory>()
var connectionInformationProvider =
rebuildProcessorScope.ServiceProvider.GetRequiredService<IPostgresqlEventStoreConnectionInformationProvider>();
var connectionInformation = connectionInformationProvider.GetConnectionInformation(connectionId);
var eventStore = new PostgresqlEventStore(
connectionInformation.ConnectionString, connectionInformation.TableName, connectionInformation.ItemsTableName
);
projectionsEngine.AddProjectionBuilder(projectionBuilder);
}
projectionsEngine.SetEventsObserver(eventObserver);
var eventObserver = new PostgresqlEventStoreEventObserver(
(PostgresqlEventStore)eventStore,
rebuildProcessorScope.ServiceProvider.GetRequiredService<ILogger<PostgresqlEventStoreEventObserver>>()
);
var projectionsEngine = new ProjectionsEngine();
foreach (var projectionBuilderType in builder.ProjectionBuilderTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(
projectionBuilderType,
rebuildProcessorScope.ServiceProvider.GetRequiredService<ProjectionRepositoryFactory>(),
new AggregateRepositoryFactory(eventStore),
rebuildProcessorScope.ServiceProvider,
ProjectionOperationIndexSelector.ProjectionRebuild
);
// no need to listen - we are attaching this projections engine to test event store which is already being observed
// by tests projections engine (see PrepareProjections method)
//await projectionsEngine.StartAsync("TestInstance");
projectionsEngine.AddProjectionBuilder(projectionBuilder);
}
return projectionsEngine;
},
sp.GetRequiredService<ILogger<ProjectionsRebuildProcessor>>()
);
});
projectionsEngine.SetEventsObserver(eventObserver);
// no need to listen - we are attaching this projections engine to test event store which is already being observed
// by tests projections engine (see PrepareProjections method)
//await projectionsEngine.StartAsync("TestInstance");
return projectionsEngine;
},
rebuildProcessorScope.ServiceProvider.GetRequiredService<ILogger<ProjectionsRebuildProcessor>>()
);
}
);

builder.Services.AddHostedService<ProjectionsRebuildProcessorHostedService>();

return builder;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Reflection;
using CloudFabric.EventSourcing.Domain;
using CloudFabric.EventSourcing.EventStore;
using CloudFabric.Projections;
Expand All @@ -9,8 +10,6 @@ public class EventSourcingBuilder : IEventSourcingBuilder
{
public IEventStore EventStore { get; set; }

public AggregateRepositoryFactory AggregateRepositoryFactory { get; set; }

public IServiceCollection Services { get; set; }

public ProjectionsEngine? ProjectionsEngine { get; set; }
Expand All @@ -19,29 +18,55 @@ public class EventSourcingBuilder : IEventSourcingBuilder

public EventsObserver ProjectionEventsObserver { get; set; }

public dynamic ConstructProjectionBuilder(Type projectionBuilderType, ProjectionRepositoryFactory projectionsRepositoryFactory)
public dynamic ConstructProjectionBuilder(
Type projectionBuilderType,
ProjectionRepositoryFactory projectionsRepositoryFactory,
AggregateRepositoryFactory aggregateRepositoryFactory,
IServiceProvider serviceProvider,
ProjectionOperationIndexSelector indexSelector
)
{
dynamic? projectionBuilder = null;

ConstructorInfo projectionBuilderConstructor = projectionBuilderType.GetConstructors().First();

var constructorArguments = new List<dynamic>();
foreach (var arg in projectionBuilderConstructor.GetParameters())
{
// parameters such as AggregateRepositoryFactory or ProjectionRepositoryFactory are bound to event sourcing scope
// and we can't get them from serviceProvider - that will cause infinite recursion loop, so we provide them separately from current scope
if (arg.ParameterType == typeof(AggregateRepositoryFactory))
{
constructorArguments.Add(aggregateRepositoryFactory);
}
else if (arg.ParameterType == typeof(ProjectionRepositoryFactory))
{
constructorArguments.Add(projectionsRepositoryFactory);
}
else if (arg.ParameterType == typeof(ProjectionOperationIndexSelector))
{
constructorArguments.Add(indexSelector);
}
else
{
constructorArguments.Add(serviceProvider.GetRequiredService(arg.ParameterType));
}
}

// There are two types of projection builders:
// First one is ProjectionBuilder<ProjectionDocument> and works with strict projection documents represented by class
// Second one is just ProjectionBuilder - those projections do not have strict schema and work with raw dictionary {key: value} type of documents.
if (projectionBuilderType?.BaseType?.GenericTypeArguments.Length > 0 &&
if (projectionBuilderType?.BaseType?.GenericTypeArguments.Length > 0 &&
projectionBuilderType.BaseType.GenericTypeArguments.Any(ta => ta.BaseType == typeof(ProjectionDocument)))
{
projectionBuilder = (IProjectionBuilder<ProjectionDocument>?)Activator.CreateInstance(
projectionBuilderType, new object[]
{
projectionsRepositoryFactory, this.AggregateRepositoryFactory
}
projectionBuilderType, constructorArguments.ToArray()
);
}
else
{
projectionBuilder = (IProjectionBuilder)Activator.CreateInstance(
projectionBuilderType, new object[]
{
projectionsRepositoryFactory, this.AggregateRepositoryFactory
}
projectionBuilderType, constructorArguments.ToArray()
);
}

Expand All @@ -52,5 +77,4 @@ public dynamic ConstructProjectionBuilder(Type projectionBuilderType, Projection

return projectionBuilder;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ public interface IEventSourcingBuilder
{
IEventStore EventStore { get; set; }

public AggregateRepositoryFactory AggregateRepositoryFactory { get; set; }

IServiceCollection Services { get; set; }

EventsObserver ProjectionEventsObserver { get; set; }
Expand All @@ -19,5 +17,11 @@ public interface IEventSourcingBuilder
string ProjectionsConnectionString { get; set; }
Type[] ProjectionBuilderTypes { get; set; }

dynamic ConstructProjectionBuilder(Type projectionBuilderType, ProjectionRepositoryFactory projectionsRepositoryFactory);
dynamic ConstructProjectionBuilder(
Type projectionBuilderType,
ProjectionRepositoryFactory projectionsRepositoryFactory,
AggregateRepositoryFactory aggregateRepositoryFactory,
IServiceProvider serviceProvider,
ProjectionOperationIndexSelector indexSelector
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ IOptions<ProjectionsRebuildProcessorOptions> options
}

public async Task StartAsync(CancellationToken cancellationToken)
{
#pragma warning disable CS4014
Task.Run(() => RunAsync(cancellationToken), cancellationToken);
#pragma warning restore CS4014
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

private async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -26,9 +38,4 @@ public async Task StartAsync(CancellationToken cancellationToken)
await Task.Delay(1000, cancellationToken);
}
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
9 changes: 9 additions & 0 deletions CloudFabric.EventSourcing.EventStore/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@ public record Event : IEvent
public string PartitionKey { get; set; }

public string AggregateType { get; set; }

public Event()
{
}

public Event(Guid aggregateId)
{
AggregateId = aggregateId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public static List<string> GetFacetablePropertyNames<T>()
.Invoke(null, new object[] { });
}

properties.Add(prop, (propertyAttribute, nestedProperties));
if (properties.Keys.All(p => p.Name != prop.Name))
{
properties.Add(prop, (propertyAttribute, nestedProperties));
}
}
}
}
Expand Down
Loading

0 comments on commit 0134e49

Please sign in to comment.