From 71e6de444c6ce61f9fa17fd36d93aa1a7b1b371c Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 2 May 2024 15:18:06 +0200 Subject: [PATCH] Changed custom IProjections like KafkaProducer, SignalRProducer, ElasticProjection to subscriptions --- Core.Marten/MartenConfig.cs | 28 +---- .../Subscriptions/MartenEventPublisher.cs | 97 ++++++++-------- .../Subscriptions/MartenSubscription.cs | 56 ---------- Core/Commands/InMemoryCommandBus.cs | 23 +--- .../Helpdesk.Api/Core/Kafka/KafkaProducer.cs | 26 +++-- .../Helpdesk.Api/Program.cs | 2 +- .../Helpdesk.Api/Core/Kafka/KafkaProducer.cs | 26 +++-- .../Core/SignalR/SignalRProducer.cs | 36 +++--- Sample/Helpdesk/Helpdesk.Api/Program.cs | 7 +- .../ElasticsearchProjectionTests.cs | 12 +- .../Projections/ElasticsearchProjection.cs | 105 +++++++++++------- .../ElasticsearchProjectionConfig.cs | 11 +- docker-compose.ci.yml | 13 ++- docker-compose.yml | 16 ++- 14 files changed, 223 insertions(+), 235 deletions(-) delete mode 100644 Core.Marten/Subscriptions/MartenSubscription.cs diff --git a/Core.Marten/MartenConfig.cs b/Core.Marten/MartenConfig.cs index 9114b9fb6..1389b8bc6 100644 --- a/Core.Marten/MartenConfig.cs +++ b/Core.Marten/MartenConfig.cs @@ -57,11 +57,12 @@ public static IServiceCollection AddMarten( { services .AddScoped() - .AddMarten(sp => SetStoreOptions(sp, martenConfig, configureOptions)) + .AddMarten(options => SetStoreOptions(options, martenConfig, configureOptions)) .UseLightweightSessions() .ApplyAllDatabaseChangesOnStartup() //.OptimizeArtifactWorkflow() - .AddAsyncDaemon(martenConfig.DaemonMode); + .AddAsyncDaemon(martenConfig.DaemonMode) + .AddSubscriptionWithServices(ServiceLifetime.Scoped); if (useExternalBus) services.AddMartenAsyncCommandBus(); @@ -69,13 +70,12 @@ public static IServiceCollection AddMarten( return services; } - private static StoreOptions SetStoreOptions( - IServiceProvider serviceProvider, + private static void SetStoreOptions( + StoreOptions options, MartenConfig martenConfig, Action? configureOptions = null ) { - var options = new StoreOptions(); options.Connection(martenConfig.ConnectionString); options.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate; @@ -92,22 +92,6 @@ private static StoreOptions SetStoreOptions( options.Projections.Errors.SkipSerializationErrors = false; options.Projections.Errors.SkipUnknownEvents = false; - options.Projections.Add( - new MartenSubscription( - new[] - { - new MartenEventPublisher( - serviceProvider, - serviceProvider.GetRequiredService(), - serviceProvider.GetRequiredService>() - ) - }, - serviceProvider.GetRequiredService>() - ), - ProjectionLifecycle.Async, - "MartenSubscription" - ); - if (martenConfig.UseMetadata) { options.Events.MetadataConfig.CausationIdEnabled = true; @@ -116,7 +100,5 @@ private static StoreOptions SetStoreOptions( } configureOptions?.Invoke(options); - - return options; } } diff --git a/Core.Marten/Subscriptions/MartenEventPublisher.cs b/Core.Marten/Subscriptions/MartenEventPublisher.cs index 809450036..2a0bd8964 100644 --- a/Core.Marten/Subscriptions/MartenEventPublisher.cs +++ b/Core.Marten/Subscriptions/MartenEventPublisher.cs @@ -2,62 +2,71 @@ using Core.OpenTelemetry; using Marten; using Marten.Events; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Events.Projections; +using Marten.Subscriptions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace Core.Marten.Subscriptions; -public class MartenEventPublisher: IMartenEventsConsumer +public class MartenEventPublisher( + IServiceProvider serviceProvider, + IActivityScope activityScope, + ILogger logger +): SubscriptionBase { - private readonly IServiceProvider serviceProvider; - private readonly IActivityScope activityScope; - private readonly ILogger logger; - - public MartenEventPublisher( - IServiceProvider serviceProvider, - IActivityScope activityScope, - ILogger logger - ) - { - this.serviceProvider = serviceProvider; - this.activityScope = activityScope; - this.logger = logger; - } - - public async Task ConsumeAsync( - IDocumentOperations documentOperations, - IReadOnlyList streamActions, - CancellationToken cancellationToken + public override async Task ProcessEventsAsync( + EventRange eventRange, + ISubscriptionController subscriptionController, + IDocumentOperations operations, + CancellationToken token ) { - foreach (var @event in streamActions.SelectMany(streamAction => streamAction.Events)) + var lastProcessed = eventRange.SequenceFloor; + try { - var parentContext = - TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata); + foreach (var @event in eventRange.Events) + { + var parentContext = + TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata); + + await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ProcessEventsAsync)}", + async (_, ct) => + { + using var scope = serviceProvider.CreateScope(); + var eventBus = scope.ServiceProvider.GetRequiredService(); + + var eventMetadata = new EventMetadata( + @event.Id.ToString(), + (ulong)@event.Version, + (ulong)@event.Sequence, + parentContext + ); - await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ConsumeAsync)}", - async (_, ct) => - { - using var scope = serviceProvider.CreateScope(); - var eventBus = scope.ServiceProvider.GetRequiredService(); + await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct) + .ConfigureAwait(false); - var eventMetadata = new EventMetadata( - @event.Id.ToString(), - (ulong)@event.Version, - (ulong)@event.Sequence, - parentContext - ); + // TODO: you can also differentiate based on the exception + // await controller.RecordDeadLetterEventAsync(e, ex); + }, + new StartActivityOptions + { + Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } }, + Parent = parentContext.ActivityContext + }, + token + ).ConfigureAwait(false); + } - await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct) - .ConfigureAwait(false); - }, - new StartActivityOptions - { - Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } }, - Parent = parentContext.ActivityContext - }, - cancellationToken - ).ConfigureAwait(false); + return NullChangeListener.Instance; + } + catch (Exception exc) + { + logger.LogError("Error while processing Marten Subscription: {ExceptionMessage}", exc.Message); + await subscriptionController.ReportCriticalFailureAsync(exc, lastProcessed).ConfigureAwait(false); + throw; } } diff --git a/Core.Marten/Subscriptions/MartenSubscription.cs b/Core.Marten/Subscriptions/MartenSubscription.cs deleted file mode 100644 index 1f6392f44..000000000 --- a/Core.Marten/Subscriptions/MartenSubscription.cs +++ /dev/null @@ -1,56 +0,0 @@ -using Marten; -using Marten.Events; -using Marten.Events.Projections; -using Microsoft.Extensions.Logging; - -namespace Core.Marten.Subscriptions; - -public class MartenSubscription: IProjection -{ - private readonly IEnumerable consumers; - private readonly ILogger logger; - - public MartenSubscription( - IEnumerable consumers, - ILogger logger - ) - { - this.consumers = consumers; - this.logger = logger; - } - - public void Apply( - IDocumentOperations operations, - IReadOnlyList streams - ) => - throw new NotImplementedException("Subscriptions should work only in the async scope"); - - public async Task ApplyAsync( - IDocumentOperations operations, - IReadOnlyList streams, - CancellationToken ct - ) - { - try - { - foreach (var consumer in consumers) - { - await consumer.ConsumeAsync(operations, streams, ct).ConfigureAwait(false); - } - } - catch (Exception exc) - { - logger.LogError("Error while processing Marten Subscription: {ExceptionMessage}", exc.Message); - throw; - } - } -} - -public interface IMartenEventsConsumer -{ - Task ConsumeAsync( - IDocumentOperations documentOperations, - IReadOnlyList streamActions, - CancellationToken ct - ); -} diff --git a/Core/Commands/InMemoryCommandBus.cs b/Core/Commands/InMemoryCommandBus.cs index a14b2a7b1..ce618956d 100644 --- a/Core/Commands/InMemoryCommandBus.cs +++ b/Core/Commands/InMemoryCommandBus.cs @@ -5,29 +5,18 @@ namespace Core.Commands; -public class InMemoryCommandBus: ICommandBus +public class InMemoryCommandBus( + IServiceProvider serviceProvider, + IActivityScope activityScope, + IAsyncPolicy retryPolicy +): ICommandBus { - private readonly IServiceProvider serviceProvider; - private readonly AsyncPolicy retryPolicy; - private readonly IActivityScope activityScope; - - public InMemoryCommandBus( - IServiceProvider serviceProvider, - IActivityScope activityScope, - AsyncPolicy retryPolicy - ) - { - this.serviceProvider = serviceProvider; - this.retryPolicy = retryPolicy; - this.activityScope = activityScope; - } - public async Task Send(TCommand command, CancellationToken ct = default) where TCommand : notnull { var wasHandled = await TrySend(command, ct).ConfigureAwait(true); - if(!wasHandled) + if (!wasHandled) throw new InvalidOperationException($"Unable to find handler for command '{command.GetType().Name}'"); } diff --git a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs index 2674bc1d7..4dc1f19c1 100644 --- a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs +++ b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs @@ -2,11 +2,13 @@ using Confluent.Kafka; using Marten; using Marten.Events; -using Marten.Events.Projections; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Subscriptions; namespace Helpdesk.Api.Core.Kafka; -public class KafkaProducer(IConfiguration configuration): IProjection +public class KafkaProducer(IConfiguration configuration): SubscriptionBase { private const string DefaultConfigKey = "KafkaProducer"; @@ -14,19 +16,24 @@ public class KafkaProducer(IConfiguration configuration): IProjection configuration.GetRequiredSection(DefaultConfigKey).Get() ?? throw new InvalidOperationException(); - public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streamsActions, - CancellationToken ct) + public override async Task ProcessEventsAsync( + EventRange eventRange, + ISubscriptionController subscriptionController, + IDocumentOperations operations, + CancellationToken ct + ) { - foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events)) + foreach (var @event in eventRange.Events) { - await Publish(@event.Data, ct); + await Publish(subscriptionController, @event, ct); } + return NullChangeListener.Instance; } public void Apply(IDocumentOperations operations, IReadOnlyList streams) => throw new NotImplementedException("Producer should be only used in the AsyncDaemon"); - private async Task Publish(object @event, CancellationToken ct) + private async Task Publish(ISubscriptionController subscriptionController, IEvent @event, CancellationToken ct) { try { @@ -40,11 +47,14 @@ await producer.ProduceAsync(config.Topic, // store event type name in message Key Key = @event.GetType().Name, // serialize event to message Value - Value = JsonSerializer.Serialize(@event) + Value = JsonSerializer.Serialize(@event.Data) }, cts.Token).ConfigureAwait(false); } catch (Exception exc) { + await subscriptionController.ReportCriticalFailureAsync(exc, @event.Sequence); + // TODO: you can also differentiate based on the exception + // await subscriptionController.RecordDeadLetterEventAsync(@event, exc); Console.WriteLine(exc.Message); throw; } diff --git a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs index 37edcbbe4..f38bea3c1 100644 --- a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs +++ b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs @@ -52,7 +52,6 @@ options.Projections.RebuildErrors.SkipSerializationErrors = false; options.Projections.RebuildErrors.SkipUnknownEvents = false; - options.Projections.Add(new KafkaProducer(builder.Configuration), ProjectionLifecycle.Async); options.Projections.Add( new SignalRProducer((IHubContext)sp.GetRequiredService>()), ProjectionLifecycle.Async @@ -65,6 +64,7 @@ }) .OptimizeArtifactWorkflow(TypeLoadMode.Static) .UseLightweightSessions() + .AddSubscriptionWithServices(ServiceLifetime.Singleton) .AddAsyncDaemon(DaemonMode.Solo) // Add Marten/PostgreSQL integration with Wolverine's outbox .IntegrateWithWolverine() diff --git a/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs b/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs index 2674bc1d7..4dc1f19c1 100644 --- a/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs +++ b/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs @@ -2,11 +2,13 @@ using Confluent.Kafka; using Marten; using Marten.Events; -using Marten.Events.Projections; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; +using Marten.Subscriptions; namespace Helpdesk.Api.Core.Kafka; -public class KafkaProducer(IConfiguration configuration): IProjection +public class KafkaProducer(IConfiguration configuration): SubscriptionBase { private const string DefaultConfigKey = "KafkaProducer"; @@ -14,19 +16,24 @@ public class KafkaProducer(IConfiguration configuration): IProjection configuration.GetRequiredSection(DefaultConfigKey).Get() ?? throw new InvalidOperationException(); - public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streamsActions, - CancellationToken ct) + public override async Task ProcessEventsAsync( + EventRange eventRange, + ISubscriptionController subscriptionController, + IDocumentOperations operations, + CancellationToken ct + ) { - foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events)) + foreach (var @event in eventRange.Events) { - await Publish(@event.Data, ct); + await Publish(subscriptionController, @event, ct); } + return NullChangeListener.Instance; } public void Apply(IDocumentOperations operations, IReadOnlyList streams) => throw new NotImplementedException("Producer should be only used in the AsyncDaemon"); - private async Task Publish(object @event, CancellationToken ct) + private async Task Publish(ISubscriptionController subscriptionController, IEvent @event, CancellationToken ct) { try { @@ -40,11 +47,14 @@ await producer.ProduceAsync(config.Topic, // store event type name in message Key Key = @event.GetType().Name, // serialize event to message Value - Value = JsonSerializer.Serialize(@event) + Value = JsonSerializer.Serialize(@event.Data) }, cts.Token).ConfigureAwait(false); } catch (Exception exc) { + await subscriptionController.ReportCriticalFailureAsync(exc, @event.Sequence); + // TODO: you can also differentiate based on the exception + // await subscriptionController.RecordDeadLetterEventAsync(@event, exc); Console.WriteLine(exc.Message); throw; } diff --git a/Sample/Helpdesk/Helpdesk.Api/Core/SignalR/SignalRProducer.cs b/Sample/Helpdesk/Helpdesk.Api/Core/SignalR/SignalRProducer.cs index e1b869255..a76bf78cd 100644 --- a/Sample/Helpdesk/Helpdesk.Api/Core/SignalR/SignalRProducer.cs +++ b/Sample/Helpdesk/Helpdesk.Api/Core/SignalR/SignalRProducer.cs @@ -1,27 +1,35 @@ using Marten; using Marten.Events; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; using Marten.Events.Projections; +using Marten.Subscriptions; using Microsoft.AspNetCore.SignalR; namespace Helpdesk.Api.Core.SignalR; -public class SignalRProducer: IProjection +public class SignalRProducer(IHubContext hubContext): SubscriptionBase { - private readonly IHubContext hubContext; - - public SignalRProducer(IHubContext hubContext) => - this.hubContext = hubContext; - - public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streamsActions, - CancellationToken ct) + public override async Task ProcessEventsAsync( + EventRange eventRange, + ISubscriptionController subscriptionController, + IDocumentOperations operations, + CancellationToken ct + ) { - foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events)) + foreach (var @event in eventRange.Events) { - await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct); + try + { + await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct); + } + catch (Exception exc) + { + // this is fine to put event to dead letter queue, as it's just SignalR notification + await subscriptionController.RecordDeadLetterEventAsync(@event, exc); + } } - } - public void Apply(IDocumentOperations operations, IReadOnlyList streams) => - throw new NotImplementedException("Producer should be only used in the AsyncDaemon"); + return NullChangeListener.Instance; + } } - diff --git a/Sample/Helpdesk/Helpdesk.Api/Program.cs b/Sample/Helpdesk/Helpdesk.Api/Program.cs index 42f395e45..05c04cbd3 100644 --- a/Sample/Helpdesk/Helpdesk.Api/Program.cs +++ b/Sample/Helpdesk/Helpdesk.Api/Program.cs @@ -53,14 +53,11 @@ options.Projections.Add(ProjectionLifecycle.Inline); options.Projections.Add(ProjectionLifecycle.Inline); options.Projections.Add(ProjectionLifecycle.Async); - options.Projections.Add(new KafkaProducer(builder.Configuration), ProjectionLifecycle.Async); - options.Projections.Add( - new SignalRProducer((IHubContext)sp.GetRequiredService>()), - ProjectionLifecycle.Async - ); return options; }) + .AddSubscriptionWithServices(ServiceLifetime.Singleton) + .AddSubscriptionWithServices(ServiceLifetime.Singleton) .OptimizeArtifactWorkflow(TypeLoadMode.Static) .UseLightweightSessions() .AddAsyncDaemon(DaemonMode.Solo); diff --git a/Sample/MartenMeetsElastic/MartenMeetsElastic.Tests/ElasticsearchProjectionTests.cs b/Sample/MartenMeetsElastic/MartenMeetsElastic.Tests/ElasticsearchProjectionTests.cs index 522eecf50..903a985f8 100644 --- a/Sample/MartenMeetsElastic/MartenMeetsElastic.Tests/ElasticsearchProjectionTests.cs +++ b/Sample/MartenMeetsElastic/MartenMeetsElastic.Tests/ElasticsearchProjectionTests.cs @@ -4,6 +4,7 @@ using FluentAssertions; using JasperFx.Core; using Marten; +using Marten.Events; using Marten.Events.Daemon; using MartenMeetsElastic.Projections; using Polly; @@ -52,8 +53,7 @@ public OrderProjectionRaw() Projects(); } - - protected override Task ApplyAsync(ElasticsearchClient client, object[] events) + protected override Task ApplyAsync(ElasticsearchClient client, object[] events, CancellationToken ct) { // (...) TODO return Task.CompletedTask; @@ -96,7 +96,7 @@ public class MeetsElasticTest: MartenMeetsElasticTest { protected override void Options(StoreOptions options) { - options.Projections.Add(elasticClient); + options.Events.AddElasticsearchProjection(elasticClient); } [Fact] @@ -104,11 +104,13 @@ public async Task ShouldProjectEvents_ToElasticsearch() { await DocumentSession.DocumentStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + await StartDaemon(); + await AppendEvents("order1", new OrderInitiated("order1", "ORD/123", new UserInfo("user1", "user1"))); - await StartDaemon(); + //await daemon.Tracker.WaitForShardState(new ShardState("MartenMeetsElastic.Tests.OrderProjection:All", 1), 15.Seconds()); - await daemon.Tracker.WaitForShardState(new ShardState("MartenMeetsElastic.Tests.OrderProjection:All", 1), 15.Seconds()); + await DocumentSession.DocumentStore.WaitForNonStaleProjectionDataAsync(20.Seconds()); await Policy .Handle() diff --git a/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjection.cs b/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjection.cs index cb28203ba..eb51b310c 100644 --- a/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjection.cs +++ b/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjection.cs @@ -4,55 +4,67 @@ using Elastic.Clients.Elasticsearch.QueryDsl; using Marten; using Marten.Events; +using Marten.Events.Daemon; +using Marten.Events.Daemon.Internals; using Marten.Events.Projections; +using Marten.Subscriptions; +using Polly; namespace MartenMeetsElastic.Projections; -public abstract class ElasticsearchProjection: IProjection +public abstract class ElasticsearchProjection: SubscriptionBase { protected abstract string IndexName { get; } public ElasticsearchClient ElasticsearchClient { private get; init; } = default!; + public IAsyncPolicy RetryPolicy { protected get; init; } = Policy.NoOpAsync(); private readonly HashSet handledEventTypes = []; protected void Projects() => handledEventTypes.Add(typeof(TEvent)); - public void Apply(IDocumentOperations operations, IReadOnlyList streams) => - throw new NotImplementedException("We don't want to do 2PC, aye?"); - - public async Task ApplyAsync( + public override async Task ProcessEventsAsync( + EventRange eventRange, + ISubscriptionController subscriptionController, IDocumentOperations operations, - IReadOnlyList streamActions, - CancellationToken cancellation + CancellationToken ct ) { - var existsResponse = await ElasticsearchClient.Indices.ExistsAsync(IndexName, cancellation); - if (!existsResponse.Exists) - await SetupMapping(ElasticsearchClient); - - var events = streamActions.SelectMany(streamAction => streamAction.Events) - .Where(@event => handledEventTypes.Contains(@event.EventType)) - .ToArray(); - - await ApplyAsync(ElasticsearchClient, events); + try + { + //TODO: Add poly! + var existsResponse = await ElasticsearchClient.Indices.ExistsAsync(IndexName, ct); + if (!existsResponse.Exists) + await SetupMapping(ElasticsearchClient); + + var events = eventRange.Events + .Where(@event => handledEventTypes.Contains(@event.EventType)) + .ToArray(); + + await ApplyAsync(ElasticsearchClient, events, ct); + } + catch (Exception exc) + { + await subscriptionController.ReportCriticalFailureAsync(exc); + } + + return NullChangeListener.Instance; } - protected virtual Task ApplyAsync(ElasticsearchClient client, IEvent[] events) => + protected virtual Task ApplyAsync(ElasticsearchClient client, IEvent[] events, CancellationToken ct) => ApplyAsync( client, - events.Where(@event => handledEventTypes.Contains(@event.EventType)).Select(@event => @event.Data).ToArray() + events.Where(@event => handledEventTypes.Contains(@event.EventType)).Select(@event => @event.Data) + .ToArray(), + ct ); - protected virtual Task ApplyAsync(ElasticsearchClient client, object[] events) => + protected virtual Task ApplyAsync(ElasticsearchClient client, object[] events, CancellationToken ct) => Task.CompletedTask; protected virtual Task SetupMapping(ElasticsearchClient client) => client.Indices.CreateAsync(IndexName); - - - public bool EnableDocumentTrackingDuringRebuilds { get; set; } } public abstract class ElasticsearchProjection: @@ -89,38 +101,45 @@ Func apply protected void DocumentId(Func documentId) => getDocumentId = documentId; -protected override async Task ApplyAsync(ElasticsearchClient client, object[] events) -{ - var ids = events.Select(GetDocumentId).ToList(); + protected override Task ApplyAsync(ElasticsearchClient client, object[] events, CancellationToken token) => + RetryPolicy.ExecuteAsync(async ct => + { + var ids = events.Select(GetDocumentId).ToList(); - var searchResponse = await client.SearchAsync(s => s - .Index(IndexName) - .Query(q => q.Ids(new IdsQuery { Values = new Ids(ids) })) - ); + var searchResponse = await client.SearchAsync(s => s + .Index(IndexName) + .Query(q => q.Ids(new IdsQuery { Values = new Ids(ids) })), ct); - var existingDocuments = searchResponse.Documents.ToDictionary(ks => getDocumentId(ks), vs => vs); + var existingDocuments = searchResponse.Documents.ToDictionary(ks => getDocumentId(ks), vs => vs); - var updatedDocuments = events.Select((@event, i) => - Apply(existingDocuments.GetValueOrDefault(ids[i], GetDefault(@event)), @event) - ).ToList(); + var updatedDocuments = events.Select((@event, i) => + Apply(existingDocuments.GetValueOrDefault(ids[i], GetDefault(@event)), @event) + ).ToList(); - var bulkAll = client.BulkAll(updatedDocuments, SetBulkOptions); + var bulkAll = client.BulkAll(updatedDocuments, SetBulkOptions, ct); - bulkAll.Wait(TimeSpan.FromSeconds(5), _ => Console.WriteLine("Data indexed")); -} -protected virtual TDocument GetDefault(object @event) => - ObjectFactory.GetDefaultOrUninitialized(); + bulkAll.Wait(TimeSpan.FromSeconds(5), _ => Console.WriteLine("Data indexed")); + }, token); -private TDocument Apply(TDocument document, object @event) => - projectors[@event.GetType()].Apply(document, @event); + protected virtual TDocument GetDefault(object @event) => + ObjectFactory.GetDefaultOrUninitialized(); -protected virtual void SetBulkOptions(BulkAllRequestDescriptor options) => - options.Index(IndexName); + private TDocument Apply(TDocument document, object @event) => + projectors[@event.GetType()].Apply(document, @event); + + protected virtual void SetBulkOptions(BulkAllRequestDescriptor options) => + options.Index(IndexName) + // .ContinueAfterDroppedDocuments() + // .DroppedDocumentCallback((r, o) => + // { + // Console.WriteLine($"{r} {o}"); + // }) + .BackOffTime(TimeSpan.FromMilliseconds(1)) + .RefreshOnCompleted(); protected override Task SetupMapping(ElasticsearchClient client) => client.Indices.CreateAsync(IndexName); private string GetDocumentId(object @event) => projectors[@event.GetType()].GetId(@event); - } diff --git a/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjectionConfig.cs b/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjectionConfig.cs index 1363d2091..428251b03 100644 --- a/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjectionConfig.cs +++ b/Sample/MartenMeetsElastic/MartenMeetsElastic/Projections/ElasticsearchProjectionConfig.cs @@ -1,16 +1,17 @@ using Elastic.Clients.Elasticsearch; +using Marten; +using Marten.Events; using Marten.Events.Projections; namespace MartenMeetsElastic.Projections; public static class ElasticsearchProjectionConfig { - public static void Add( - this ProjectionOptions projectionOptions, + public static void AddElasticsearchProjection( + this IEventStoreOptions options, ElasticsearchClient client ) where TElasticsearchProjection : ElasticsearchProjection, new() => - projectionOptions.Add( - new TElasticsearchProjection { ElasticsearchClient = client }, - ProjectionLifecycle.Async + options.Subscribe( + new TElasticsearchProjection { ElasticsearchClient = client } ); } diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index fe16f6cc7..e0b6e6594 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -47,11 +47,20 @@ services: ####################################################### elasticsearch: container_name: elastic_search - image: docker.elastic.co/elasticsearch/elasticsearch:7.13.3 + image: docker.elastic.co/elasticsearch/elasticsearch:8.13.2 environment: - discovery.type=single-node - - bootstrap.memory_lock=true + - cluster.name=docker-cluster + - node.name=docker-node - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - xpack.security.enabled=false + - xpack.security.http.ssl.enabled=false + - xpack.security.transport.ssl.enabled=false + - network.host=0.0.0.0 + - http.port=9200 + - transport.host=localhost + - bootstrap.memory_lock=true + - cluster.routing.allocation.disk.threshold_enabled=false ulimits: memlock: soft: -1 diff --git a/docker-compose.yml b/docker-compose.yml index e94e99874..7b562a88d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -68,12 +68,20 @@ services: # Elastic Search ####################################################### elasticsearch: - container_name: elastic_search - image: docker.elastic.co/elasticsearch/elasticsearch:7.13.3 + image: docker.elastic.co/elasticsearch/elasticsearch:8.13.2 environment: - discovery.type=single-node - - bootstrap.memory_lock=true + - cluster.name=docker-cluster + - node.name=docker-node - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - xpack.security.enabled=false + - xpack.security.http.ssl.enabled=false + - xpack.security.transport.ssl.enabled=false + - network.host=0.0.0.0 + - http.port=9200 + - transport.host=localhost + - bootstrap.memory_lock=true + - cluster.routing.allocation.disk.threshold_enabled=false ulimits: memlock: soft: -1 @@ -87,7 +95,7 @@ services: - es_network kibana: - image: docker.elastic.co/kibana/kibana:7.13.3 + image: docker.elastic.co/kibana/kibana:8.13.2 environment: - ELASTICSEARCH_HOSTS=http://elastic_search:9200 ports: