Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projections index rebuild proof-of-concept #59

Merged
merged 8 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches: [ "main" ]

env:
PACKAGE_VERSION: 0.1.3
PACKAGE_VERSION: 0.1.4

jobs:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using CloudFabric.Projections.CosmosDb;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace CloudFabric.EventSourcing.AspNet.CosmosDb.Extensions
{
Expand All @@ -22,30 +23,32 @@ public static IEventSourcingBuilder AddCosmosDbEventStore(
string processorName
)
{
var cosmosClient = new CosmosClient(connectionString, cosmosClientOptions);
services.AddScoped<AggregateRepositoryFactory>((sp) =>
{
var logger = sp.GetRequiredService<ILogger<CosmosDbEventStoreChangeFeedObserver>>();

var cosmosClient = new CosmosClient(connectionString, cosmosClientOptions);

var eventStore = new CosmosDbEventStore(cosmosClient, databaseId, eventsContainerId, itemsContainerId);
eventStore.Initialize().Wait();
var eventStore = new CosmosDbEventStore(cosmosClient, databaseId, eventsContainerId, itemsContainerId);
eventStore.Initialize().Wait();

var eventStoreObserver = new CosmosDbEventStoreChangeFeedObserver(
cosmosClient,
databaseId,
eventsContainerId,
leaseClient,
leaseDatabaseId,
leaseContainerId,
processorName
);
var eventStoreObserver = new CosmosDbEventStoreChangeFeedObserver(
cosmosClient,
databaseId,
eventsContainerId,
leaseClient,
leaseDatabaseId,
leaseContainerId,
processorName,
logger
);

AggregateRepositoryFactory aggregateRepositoryFactory = new AggregateRepositoryFactory(eventStore);
services.AddScoped(sp => aggregateRepositoryFactory);
return new AggregateRepositoryFactory(eventStore);
});

return new EventSourcingBuilder
{
EventStore = eventStore,
Services = services,
ProjectionEventsObserver = eventStoreObserver,
AggregateRepositoryFactory = aggregateRepositoryFactory
Services = services
};
}

Expand Down Expand Up @@ -97,16 +100,16 @@ params Type[] projectionBuildersTypes
// TryAddScoped is used to be able to add a few event stores with separate calls of AddPostgresqlProjections
builder.Services.AddScoped<ProjectionRepositoryFactory>((sp) => projectionsRepositoryFactory);

// add repository for saving rebuild states
var projectionStateRepository = new CosmosDbProjectionRepository<ProjectionRebuildState>(
projectionsConnectionInfo.LoggerFactory,
projectionsConnectionInfo.ConnectionString,
projectionsConnectionInfo.CosmosClientOptions,
projectionsConnectionInfo.DatabaseId,
projectionsConnectionInfo.ContainerId
);
// // add repository for saving rebuild states
// var projectionStateRepository = new CosmosDbProjectionRepository<ProjectionRebuildState>(
// projectionsConnectionInfo.LoggerFactory,
// projectionsConnectionInfo.ConnectionString,
// projectionsConnectionInfo.CosmosClientOptions,
// projectionsConnectionInfo.DatabaseId,
// projectionsConnectionInfo.ContainerId
// );

var projectionsEngine = new ProjectionsEngine(projectionStateRepository);
var projectionsEngine = new ProjectionsEngine();

if (builder.ProjectionEventsObserver == null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using CloudFabric.Projections;
using CloudFabric.Projections.ElasticSearch;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace CloudFabric.EventSourcing.AspNet.ElasticSearch.Extensions
{
public static class ServiceCollectionExtensions
{
// NOTE: projection repositories can't work with different databases for now
public static IEventSourcingBuilder AddElasticSearchProjections(
this IEventSourcingBuilder builder,
ElasticSearchBasicAuthConnectionSettings basicAuthConnectionSettings,
Expand All @@ -16,43 +15,19 @@ public static IEventSourcingBuilder AddElasticSearchProjections(
params Type[] projectionBuildersTypes
)
{
var projectionsRepositoryFactory = new ElasticSearchProjectionRepositoryFactory(
basicAuthConnectionSettings,
loggerFactory,
disableRequestStreaming
builder.ProjectionBuilderTypes = projectionBuildersTypes;

builder.Services.AddScoped<ProjectionRepositoryFactory>(
(sp) => new ElasticSearchProjectionRepositoryFactory(
basicAuthConnectionSettings,
loggerFactory,
disableRequestStreaming
)
);

// TryAddScoped is used to be able to add a few event stores with separate calls of AddPostgresqlProjections
builder.Services.TryAddScoped<ProjectionRepositoryFactory>((sp) => projectionsRepositoryFactory);

// add repository for saving rebuild states
var projectionStateRepository = new ElasticSearchProjectionRepository<ProjectionRebuildState>(
basicAuthConnectionSettings,
loggerFactory,
disableRequestStreaming
);

var projectionsEngine = new ProjectionsEngine(projectionStateRepository);

if (builder.ProjectionEventsObserver == null)
{
throw new ArgumentException("Projection events observer is missing");
}

projectionsEngine.SetEventsObserver(builder.ProjectionEventsObserver);

foreach (var projectionBuilderType in projectionBuildersTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);

projectionsEngine.AddProjectionBuilder(projectionBuilder);
}

builder.ProjectionsEngine = projectionsEngine;

return builder;
}

public static IEventSourcingBuilder AddElasticSearchProjections(
this IEventSourcingBuilder builder,
ElasticSearchApiKeyAuthConnectionSettings apiKeyAuthConnectionSettings,
Expand All @@ -61,40 +36,16 @@ public static IEventSourcingBuilder AddElasticSearchProjections(
params Type[] projectionBuildersTypes
)
{
var projectionsRepositoryFactory = new ElasticSearchProjectionRepositoryFactory(
apiKeyAuthConnectionSettings,
loggerFactory,
disableRequestStreaming
builder.ProjectionBuilderTypes = projectionBuildersTypes;

builder.Services.AddScoped<ProjectionRepositoryFactory>(
(sp) => new ElasticSearchProjectionRepositoryFactory(
apiKeyAuthConnectionSettings,
loggerFactory,
disableRequestStreaming
)
);

// TryAddScoped is used to be able to add a few event stores with separate calls of AddPostgresqlProjections
builder.Services.TryAddScoped<ProjectionRepositoryFactory>((sp) => projectionsRepositoryFactory);

// add repository for saving rebuild states
var projectionStateRepository = new ElasticSearchProjectionRepository<ProjectionRebuildState>(
apiKeyAuthConnectionSettings,
loggerFactory,
disableRequestStreaming
);

var projectionsEngine = new ProjectionsEngine(projectionStateRepository);

if (builder.ProjectionEventsObserver == null)
{
throw new ArgumentException("Projection events observer is missing");
}

projectionsEngine.SetEventsObserver(builder.ProjectionEventsObserver);

foreach (var projectionBuilderType in projectionBuildersTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);

projectionsEngine.AddProjectionBuilder(projectionBuilder);
}

builder.ProjectionsEngine = projectionsEngine;

return builder;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using CloudFabric.EventSourcing.EventStore;
using System.Runtime.CompilerServices;
using CloudFabric.EventSourcing.Domain;
using CloudFabric.EventSourcing.EventStore.InMemory;
using CloudFabric.Projections;
using CloudFabric.Projections.InMemory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace CloudFabric.EventSourcing.AspNet.InMemory.Extensions
{
Expand All @@ -16,22 +18,47 @@ public static IEventSourcingBuilder AddInMemoryEventStore(
Dictionary<(string, string), string> itemsContainer
)
{
var eventStore = new InMemoryEventStore(eventsContainer, itemsContainer);
eventStore.Initialize().Wait();

// add events observer for projections
var eventStoreObserver = new InMemoryEventStoreEventObserver(eventStore);

AggregateRepositoryFactory aggregateRepositoryFactory = new AggregateRepositoryFactory(eventStore);
services.AddScoped(sp => aggregateRepositoryFactory);

return new EventSourcingBuilder
var builder = new EventSourcingBuilder
{
EventStore = eventStore,
Services = services,
ProjectionEventsObserver = eventStoreObserver,
AggregateRepositoryFactory = aggregateRepositoryFactory
Services = services
};

services.AddScoped<IEventStore>(
(sp) =>
{
var eventStore = new InMemoryEventStore(eventsContainer, itemsContainer);
eventStore.Initialize().Wait();

// add events observer for projections
var eventStoreObserver = new InMemoryEventStoreEventObserver(
eventStore, sp.GetRequiredService<ILogger<InMemoryEventStoreEventObserver>>()
);

var projectionsRepositoryFactory = sp.GetService<ProjectionRepositoryFactory>();

// Postgresql's event observer is synchronous - it just handles all calls to npgsql commands, there is no delay
// or log processing. That means that all events are happening in request context, possibly on multiple threads,
// so having one global projections builder is more complicated than simply creating new projections builder for each request.
if (projectionsRepositoryFactory != null)
{
var projectionsEngine = new ProjectionsEngine();
projectionsEngine.SetEventsObserver(eventStoreObserver);

foreach (var projectionBuilderType in builder.ProjectionBuilderTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);

projectionsEngine.AddProjectionBuilder(projectionBuilder);
}

projectionsEngine.StartAsync("").GetAwaiter().GetResult();
}

return eventStore;
}
);

return builder;
}

public static IEventSourcingBuilder AddInMemoryEventStore(this IServiceCollection services)
Expand All @@ -45,45 +72,29 @@ public static IEventSourcingBuilder AddInMemoryEventStore(this IServiceCollectio
public static IEventSourcingBuilder AddRepository<TRepo>(this IEventSourcingBuilder builder)
where TRepo : class
{
if (builder.EventStore == null)
{
throw new ArgumentException("Event store is missing");
}

builder.Services.AddSingleton(sp => ActivatorUtilities.CreateInstance<TRepo>(sp, new object[] { builder.EventStore }));
builder.Services.AddScoped(
sp =>
{
var eventStore = sp.GetRequiredService<IEventStore>();
return ActivatorUtilities.CreateInstance<TRepo>(sp, new object[] { eventStore });
}
);

return builder;
}

// NOTE: projection repositories can't work with different databases for now
public static IEventSourcingBuilder AddInMemoryProjections(
this IEventSourcingBuilder builder,
params Type[] projectionBuildersTypes
)
{
var projectionsRepositoryFactory = new InMemoryProjectionRepositoryFactory();

builder.Services.TryAddScoped<ProjectionRepositoryFactory>((sp) => projectionsRepositoryFactory);

// add repository for saving rebuild states
var projectionStateRepository = new InMemoryProjectionRepository<ProjectionRebuildState>();
builder.ProjectionBuilderTypes = projectionBuildersTypes;

var projectionsEngine = new ProjectionsEngine(projectionStateRepository);

if (builder.ProjectionEventsObserver == null)
builder.Services.AddScoped<ProjectionRepositoryFactory>((sp) =>
{
throw new ArgumentException("Projection events observer is missing");
}

projectionsEngine.SetEventsObserver(builder.ProjectionEventsObserver);

foreach (var projectionBuilderType in projectionBuildersTypes)
{
var projectionBuilder = builder.ConstructProjectionBuilder(projectionBuilderType, projectionsRepositoryFactory);

projectionsEngine.AddProjectionBuilder(projectionBuilder);
}

builder.ProjectionsEngine = projectionsEngine;
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
return new InMemoryProjectionRepositoryFactory(loggerFactory);
});

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\CloudFabric.Projections.Worker\CloudFabric.Projections.Worker.csproj" />
<ProjectReference Include="..\..\Implementations\CloudFabric.EventSourcing.EventStore.Postgresql\CloudFabric.EventSourcing.EventStore.Postgresql.csproj" />
<ProjectReference Include="..\..\Implementations\CloudFabric.Projections.Postgresql\CloudFabric.Projections.Postgresql.csproj" />
<ProjectReference Include="..\CloudFabric.EventSourcing.AspNet\CloudFabric.EventSourcing.AspNet.csproj" />
Expand Down
Loading
Loading