Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed custom IProjections like KafkaProducer, SignalRProducer, ElasticProjection to subscriptions #245

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions Core.Marten/MartenConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,25 @@ public static IServiceCollection AddMarten(
{
services
.AddScoped<IIdGenerator, MartenIdGenerator>()
.AddMarten(sp => SetStoreOptions(sp, martenConfig, configureOptions))
.AddMarten(options => SetStoreOptions(options, martenConfig, configureOptions))
.UseLightweightSessions()
.ApplyAllDatabaseChangesOnStartup()
//.OptimizeArtifactWorkflow()
.AddAsyncDaemon(martenConfig.DaemonMode);
.AddAsyncDaemon(martenConfig.DaemonMode)
.AddSubscriptionWithServices<MartenEventPublisher>(ServiceLifetime.Scoped);

if (useExternalBus)
services.AddMartenAsyncCommandBus();

return services;
}

private static StoreOptions SetStoreOptions(
IServiceProvider serviceProvider,
private static void SetStoreOptions(
StoreOptions options,
MartenConfig martenConfig,
Action<StoreOptions>? configureOptions = null
)
{
var options = new StoreOptions();
options.Connection(martenConfig.ConnectionString);
options.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate;

Expand All @@ -92,22 +92,6 @@ private static StoreOptions SetStoreOptions(
options.Projections.Errors.SkipSerializationErrors = false;
options.Projections.Errors.SkipUnknownEvents = false;

options.Projections.Add(
new MartenSubscription(
new[]
{
new MartenEventPublisher(
serviceProvider,
serviceProvider.GetRequiredService<IActivityScope>(),
serviceProvider.GetRequiredService<ILogger<MartenEventPublisher>>()
)
},
serviceProvider.GetRequiredService<ILogger<MartenSubscription>>()
),
ProjectionLifecycle.Async,
"MartenSubscription"
);

if (martenConfig.UseMetadata)
{
options.Events.MetadataConfig.CausationIdEnabled = true;
Expand All @@ -116,7 +100,5 @@ private static StoreOptions SetStoreOptions(
}

configureOptions?.Invoke(options);

return options;
}
}
97 changes: 53 additions & 44 deletions Core.Marten/Subscriptions/MartenEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,71 @@
using Core.OpenTelemetry;
using Marten;
using Marten.Events;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Events.Projections;
using Marten.Subscriptions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Core.Marten.Subscriptions;

public class MartenEventPublisher: IMartenEventsConsumer
public class MartenEventPublisher(
IServiceProvider serviceProvider,
IActivityScope activityScope,
ILogger<MartenEventPublisher> logger
): SubscriptionBase
{
private readonly IServiceProvider serviceProvider;
private readonly IActivityScope activityScope;
private readonly ILogger<MartenEventPublisher> logger;

public MartenEventPublisher(
IServiceProvider serviceProvider,
IActivityScope activityScope,
ILogger<MartenEventPublisher> logger
)
{
this.serviceProvider = serviceProvider;
this.activityScope = activityScope;
this.logger = logger;
}

public async Task ConsumeAsync(
IDocumentOperations documentOperations,
IReadOnlyList<StreamAction> streamActions,
CancellationToken cancellationToken
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken token
)
{
foreach (var @event in streamActions.SelectMany(streamAction => streamAction.Events))
var lastProcessed = eventRange.SequenceFloor;
try
{
var parentContext =
TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata);
foreach (var @event in eventRange.Events)
{
var parentContext =
TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata);

await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ProcessEventsAsync)}",
async (_, ct) =>
{
using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();

var eventMetadata = new EventMetadata(
@event.Id.ToString(),
(ulong)@event.Version,
(ulong)@event.Sequence,
parentContext
);

await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ConsumeAsync)}",
async (_, ct) =>
{
using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct)
.ConfigureAwait(false);

var eventMetadata = new EventMetadata(
@event.Id.ToString(),
(ulong)@event.Version,
(ulong)@event.Sequence,
parentContext
);
// TODO: you can also differentiate based on the exception
// await controller.RecordDeadLetterEventAsync(e, ex);
},
new StartActivityOptions
{
Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } },
Parent = parentContext.ActivityContext
},
token
).ConfigureAwait(false);
}

await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct)
.ConfigureAwait(false);
},
new StartActivityOptions
{
Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } },
Parent = parentContext.ActivityContext
},
cancellationToken
).ConfigureAwait(false);
return NullChangeListener.Instance;
}
catch (Exception exc)
{
logger.LogError("Error while processing Marten Subscription: {ExceptionMessage}", exc.Message);
await subscriptionController.ReportCriticalFailureAsync(exc, lastProcessed).ConfigureAwait(false);
throw;
}
}

Expand Down
56 changes: 0 additions & 56 deletions Core.Marten/Subscriptions/MartenSubscription.cs

This file was deleted.

23 changes: 6 additions & 17 deletions Core/Commands/InMemoryCommandBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,18 @@

namespace Core.Commands;

public class InMemoryCommandBus: ICommandBus
public class InMemoryCommandBus(
IServiceProvider serviceProvider,
IActivityScope activityScope,
IAsyncPolicy retryPolicy
): ICommandBus
{
private readonly IServiceProvider serviceProvider;
private readonly AsyncPolicy retryPolicy;
private readonly IActivityScope activityScope;

public InMemoryCommandBus(
IServiceProvider serviceProvider,
IActivityScope activityScope,
AsyncPolicy retryPolicy
)
{
this.serviceProvider = serviceProvider;
this.retryPolicy = retryPolicy;
this.activityScope = activityScope;
}

public async Task Send<TCommand>(TCommand command, CancellationToken ct = default)
where TCommand : notnull
{
var wasHandled = await TrySend(command, ct).ConfigureAwait(true);

if(!wasHandled)
if (!wasHandled)
throw new InvalidOperationException($"Unable to find handler for command '{command.GetType().Name}'");
}

Expand Down
26 changes: 18 additions & 8 deletions Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,38 @@
using Confluent.Kafka;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Subscriptions;

namespace Helpdesk.Api.Core.Kafka;

public class KafkaProducer(IConfiguration configuration): IProjection
public class KafkaProducer(IConfiguration configuration): SubscriptionBase
{
private const string DefaultConfigKey = "KafkaProducer";

private readonly KafkaProducerConfig config =
configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken ct
)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
foreach (var @event in eventRange.Events)
{
await Publish(@event.Data, ct);
await Publish(subscriptionController, @event, ct);
}
return NullChangeListener.Instance;
}

public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");

private async Task Publish(object @event, CancellationToken ct)
private async Task Publish(ISubscriptionController subscriptionController, IEvent @event, CancellationToken ct)
{
try
{
Expand All @@ -40,11 +47,14 @@ await producer.ProduceAsync(config.Topic,
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonSerializer.Serialize(@event)
Value = JsonSerializer.Serialize(@event.Data)
}, cts.Token).ConfigureAwait(false);
}
catch (Exception exc)
{
await subscriptionController.ReportCriticalFailureAsync(exc, @event.Sequence);
// TODO: you can also differentiate based on the exception
// await subscriptionController.RecordDeadLetterEventAsync(@event, exc);
Console.WriteLine(exc.Message);
throw;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Subscriptions;
using Microsoft.AspNetCore.SignalR;

namespace Helpdesk.Api.Core.SignalR;

public class SignalRProducer: IProjection
public class SignalRProducer<THub>(IHubContext<THub> hubContext): SubscriptionBase where THub : Hub
{
private readonly IHubContext hubContext;

public SignalRProducer(IHubContext hubContext) =>
this.hubContext = hubContext;

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken ct
)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
foreach (var @event in eventRange.Events)
{
await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct);
try
{
await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct);
}
catch (Exception exc)
{
// this is fine to put event to dead letter queue, as it's just SignalR notification
await subscriptionController.RecordDeadLetterEventAsync(@event, exc);
}
}
}

public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");
return NullChangeListener.Instance;
}
}

Loading
Loading