Skip to content

Commit

Permalink
Updated MartenEventPublisher to publish events with metadata. Thanks …
Browse files Browse the repository at this point in the history
…to that, Kafka Producer can also include it and KafkaConsumer can get them and republish maintaining CorrelationId and generating new CausationId

Removed usage of IEvent. The marker interface is not needed anymore as MediatR event bus is not used anymore in samples
  • Loading branch information
oskardudycz committed Mar 8, 2022
1 parent 9f0d966 commit 68afb98
Show file tree
Hide file tree
Showing 58 changed files with 175 additions and 166 deletions.
4 changes: 2 additions & 2 deletions Core.ElasticSearch/Projections/ElasticSearchProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Core.ElasticSearch.Projections;

public class ElasticSearchProjection<TEvent, TView> : IEventHandler<EventEnvelope<TEvent>>
where TView : class, IProjection
where TEvent : IEvent
where TEvent : notnull
{
private readonly IElasticClient elasticClient;
private readonly Func<TEvent, string> getId;
Expand Down Expand Up @@ -46,7 +46,7 @@ public static class ElasticSearchProjectionConfig
public static IServiceCollection Project<TEvent, TView>(this IServiceCollection services,
Func<TEvent, string> getId)
where TView : class, IProjection
where TEvent : IEvent
where TEvent : notnull
{
services.AddTransient<IEventHandler<EventEnvelope<TEvent>>>(sp =>
{
Expand Down
1 change: 0 additions & 1 deletion Core.ElasticSearch/Repository/ElasticSearchRepository.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Core.ElasticSearch.Indices;
using Core.Events;
using Nest;
using IAggregate = Core.Aggregates.IAggregate;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Core.Events;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.Events;
using Core.Tracing;
using Core.Tracing.Causation;
using Core.Tracing.Correlation;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Core.Events;
using Core.Tracing;
using Core.Tracing;
using Core.Tracing.Causation;
using Core.Tracing.Correlation;
using Newtonsoft.Json;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
using Core.Events;
using Core.EventStoreDB.Serialization;
using Core.EventStoreDB.Serialization;
using EventStore.Client;

namespace Core.EventStoreDB.Subscriptions;

public record CheckpointStored(string SubscriptionId, ulong? Position, DateTime CheckpointedAt): IEvent;
public record CheckpointStored(string SubscriptionId, ulong? Position, DateTime CheckpointedAt);

public class EventStoreDBSubscriptionCheckpointRepository: ISubscriptionCheckpointRepository
{
Expand Down
6 changes: 5 additions & 1 deletion Core.Kafka/Config.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Core.BackgroundWorkers;
using Core.Events;
using Core.Events.External;
using Core.Kafka.Consumers;
using Core.Kafka.Producers;
Expand All @@ -13,7 +14,10 @@ public static class Config
public static IServiceCollection AddKafkaProducer(this IServiceCollection services)
{
//using TryAdd to support mocking, without that it won't be possible to override in tests
services.TryAddScoped<IExternalEventProducer, KafkaProducer>();
services.TryAddSingleton<IExternalEventProducer, KafkaProducer>();
services.AddSingleton<IEventBus>(sp =>
new EventBusDecoratorWithExternalProducer(sp.GetRequiredService<EventBus>(),
sp.GetRequiredService<IExternalEventProducer>()));
return services;
}

Expand Down
16 changes: 5 additions & 11 deletions Core.Kafka/Consumers/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using Confluent.Kafka;
using Core.Events;
using Core.Events.External;
using Core.Reflection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using IEventBus = Core.Events.IEventBus;
Expand All @@ -12,18 +10,18 @@ namespace Core.Kafka.Consumers;

public class KafkaConsumer: IExternalEventConsumer
{
private readonly IServiceProvider serviceProvider;
private readonly ILogger<KafkaConsumer> logger;
private readonly IEventBus eventBus;
private readonly KafkaConsumerConfig config;

public KafkaConsumer(
IServiceProvider serviceProvider,
ILogger<KafkaConsumer> logger,
IConfiguration configuration
IConfiguration configuration,
IEventBus eventBus
)
{
this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.eventBus = eventBus;

if (configuration == null)
throw new ArgumentNullException(nameof(configuration));
Expand Down Expand Up @@ -73,12 +71,8 @@ private async Task ConsumeNextEvent(IConsumer<string, string> consumer, Cancella
// deserialize event
var @event = JsonConvert.DeserializeObject(message.Message.Value, eventType)!;

using var scope = serviceProvider.CreateScope();
var eventBus =
scope.ServiceProvider.GetRequiredService<IEventBus>();

// publish event to internal event bus
await eventBus.Publish((IEvent)@event, cancellationToken);
await eventBus.Publish(@event, cancellationToken);
}
catch (Exception e)
{
Expand Down
55 changes: 42 additions & 13 deletions Core.Kafka/Producers/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using Confluent.Kafka;
using Core.Events;
using Core.Events.External;
using Core.Serialization.Newtonsoft;
using Core.Threading;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;

namespace Core.Kafka.Producers;

Expand All @@ -21,18 +22,46 @@ IConfiguration configuration
config = configuration.GetKafkaProducerConfig();
}

public async Task Publish(IExternalEvent @event, CancellationToken cancellationToken)
public async Task Publish(EventEnvelope @event, CancellationToken ct)
{
using var p = new ProducerBuilder<string, string>(config.ProducerConfig).Build();
await Task.Yield();
// publish event to kafka topic taken from config
await p.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonConvert.SerializeObject(@event)
}, cancellationToken);
try
{
using var p = new ProducerBuilder<string, string>(config.ProducerConfig).Build();
await Task.Yield();
// publish event to kafka topic taken from config

// var result = p.ProduceAsync(config.Topic,
// new Message<string, string>
// {
// // store event type name in message Key
// Key = @event.GetType().Name,
// // serialize event to message Value
// Value = @event.ToJson()
// }, ct).ConfigureAwait(false).GetAwaiter().GetResult();

p.Produce(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = @event.ToJson()
});
p.Flush(ct);
// Console.WriteLine(result);
// var result = p.ProduceAsync(config.Topic,
// new Message<string, string>
// {
// // store event type name in message Key
// Key = @event.GetType().Name,
// // serialize event to message Value
// Value = @event.ToJson()
// }, ct).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
16 changes: 13 additions & 3 deletions Core.Marten/Subscriptions/MartenEventPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Core.Events;
using Core.Tracing;
using Core.Tracing.Causation;
using Core.Tracing.Correlation;
using Marten;
using Marten.Events;
using Microsoft.Extensions.DependencyInjection;
using IEvent = Core.Events.IEvent;

namespace Core.Marten.Subscriptions;

Expand Down Expand Up @@ -34,9 +36,17 @@ public async Task ConsumeAsync(IDocumentOperations documentOperations, IReadOnly
using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();

if (@event.Data is not IEvent mappedEvent) continue;
var eventMetadata = new EventMetadata(
@event.Id.ToString(),
(ulong)@event.Version,
(ulong)@event.Sequence,
new TraceMetadata(
@event.CorrelationId != null ? new CorrelationId(@event.CorrelationId) : null,
@event.CausationId != null ? new CausationId(@event.CausationId) : null
)
);

await eventBus.Publish(mappedEvent, ct);
await eventBus.Publish(new EventEnvelope(@event.Data, eventMetadata), ct);
}
}
}
3 changes: 2 additions & 1 deletion Core.Serialization/Newtonsoft/SerializationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public static object FromJson(this string json, Type type)
/// <returns>json string</returns>
public static string ToJson(this object obj)
{
return JsonConvert.SerializeObject(obj);
return JsonConvert.SerializeObject(obj,
new JsonSerializerSettings().WithNonDefaultConstructorContractResolver());
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Core.Testing/AggregateExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Core.Testing;

public static class AggregateExtensions
{
public static T? PublishedEvent<T>(this IAggregate aggregate) where T : class, IEvent
public static T? PublishedEvent<T>(this IAggregate aggregate) where T : class
{
return aggregate.DequeueUncommittedEvents().LastOrDefault() as T;
}
Expand Down
9 changes: 4 additions & 5 deletions Core.Testing/ApiFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ public override TestContext CreateTestContext() =>
services.AddSingleton(eventsLog);
services.AddSingleton(typeof(IEventHandler<>), typeof(EventListener<>));
services.AddSingleton<IExternalEventProducer>(externalEventProducer);
services.AddSingleton<IEventBus>(sp =>
new EventBusDecoratorWithExternalProducer(sp.GetRequiredService<EventBus>(),
sp.GetRequiredService<IExternalEventProducer>()));
services.AddSingleton<IExternalCommandBus>(externalCommandBus);
services.AddSingleton<IExternalEventConsumer, DummyExternalEventConsumer>();

}, SetupWebHostBuilder);



public IReadOnlyCollection<TEvent> PublishedExternalEventsOfType<TEvent>() where TEvent : IExternalEvent
{
return externalEventProducer.PublishedEvents.OfType<TEvent>().ToList();
Expand All @@ -39,7 +40,7 @@ public IReadOnlyCollection<TCommand> PublishedExternalCommandOfType<TCommand>()
return externalCommandBus.SentCommands.OfType<TCommand>().ToList();
}

public async Task PublishInternalEvent(IEvent @event, CancellationToken ct = default)
public async Task PublishInternalEvent(object @event, CancellationToken ct = default)
{
using var scope = Server.Host.Services.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
Expand Down Expand Up @@ -78,8 +79,6 @@ public async Task ShouldPublishInternalEventOfType<TEvent>(
retryCount--;
} while (!finished);
}


}

public abstract class ApiWithEventsFixture: ApiFixture
Expand Down
6 changes: 3 additions & 3 deletions Core.Testing/DummyExternalEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ namespace Core.Testing;

public class DummyExternalEventProducer: IExternalEventProducer
{
public IList<IExternalEvent> PublishedEvents { get; } = new List<IExternalEvent>();
public IList<object> PublishedEvents { get; } = new List<object>();

public Task Publish(IExternalEvent @event, CancellationToken ct)
public Task Publish(EventEnvelope @event, CancellationToken ct)
{
PublishedEvents.Add(@event);
PublishedEvents.Add(@event.Data);

return Task.CompletedTask;
}
Expand Down
4 changes: 2 additions & 2 deletions Core.Testing/EventListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace Core.Testing;

public class EventsLog
{
public List<IEvent> PublishedEvents { get; } = new();
public List<object> PublishedEvents { get; } = new();
}

public class EventListener<TEvent>: IEventHandler<TEvent>
where TEvent : IEvent
where TEvent : notnull
{
private readonly EventsLog eventsLog;

Expand Down
3 changes: 1 addition & 2 deletions Core.WebApi/Tracing/TracingMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Core.Events;
using Core.Tracing;
using Core.Tracing;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
Expand Down
8 changes: 3 additions & 5 deletions Core/Aggregates/Aggregate.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using Core.Events;

namespace Core.Aggregates;

public abstract class Aggregate: Aggregate<Guid>, IAggregate
Expand All @@ -12,11 +10,11 @@ public abstract class Aggregate<T>: IAggregate<T> where T : notnull

public int Version { get; protected set; }

[NonSerialized] private readonly Queue<IEvent> uncommittedEvents = new();
[NonSerialized] private readonly Queue<object> uncommittedEvents = new();

public virtual void When(object @event) { }

public IEvent[] DequeueUncommittedEvents()
public object[] DequeueUncommittedEvents()
{
var dequeuedEvents = uncommittedEvents.ToArray();

Expand All @@ -25,7 +23,7 @@ public IEvent[] DequeueUncommittedEvents()
return dequeuedEvents;
}

protected void Enqueue(IEvent @event)
protected void Enqueue(object @event)
{
uncommittedEvents.Enqueue(@event);
}
Expand Down
3 changes: 1 addition & 2 deletions Core/Aggregates/IAggregate.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Core.Events;
using Core.Projections;

namespace Core.Aggregates;
Expand All @@ -12,5 +11,5 @@ public interface IAggregate<out T>: IProjection
T Id { get; }
int Version { get; }

IEvent[] DequeueUncommittedEvents();
object[] DequeueUncommittedEvents();
}
3 changes: 0 additions & 3 deletions Core/Config.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Core.Commands;
using Core.Events;
using Core.Events.External;
using Core.Ids;
using Core.Queries;
using Core.Requests;
Expand All @@ -23,15 +22,13 @@ public static IServiceCollection AddCoreServices(this IServiceCollection service
.AddTracing()
.AddEventBus();

services.TryAddScoped<IExternalEventProducer, NulloExternalEventProducer>();
services.TryAddScoped<IExternalCommandBus, ExternalCommandBus>();

services.TryAddScoped<IIdGenerator, NulloIdGenerator>();

return services;
}


public static IServiceCollection AddTracing(this IServiceCollection services)
{
services.TryAddSingleton<ICorrelationIdFactory, GuidCorrelationIdFactory>();
Expand Down
1 change: 1 addition & 0 deletions Core/Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="RestSharp" Version="107.3.0" />
<PackageReference Include="Scrutor" Version="3.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
1 change: 0 additions & 1 deletion Core/Events/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ public static class Config
public static IServiceCollection AddEventHandler<TEvent, TEventHandler>(
this IServiceCollection services
)
where TEvent : IEvent
where TEventHandler : class, IEventHandler<TEvent>
{
return services
Expand Down
Loading

0 comments on commit 68afb98

Please sign in to comment.