From 35ac52f90581b84ce82d42c9eb2e476904976d43 Mon Sep 17 00:00:00 2001 From: Cloudfy Date: Sun, 1 Dec 2024 20:29:10 +0100 Subject: [PATCH 1/3] First iterative approach and implementation. Adding Abstracts of events to handle light baseline, adding Events as core and RabbitMQ as sample. --- Arcturus.sln | 18 +++++ .../Arcturus.EventBus.Abstracts.csproj | 9 +++ .../IConnection.cs | 13 ++++ .../IEventBusFactory.cs | 8 ++ .../IEventMessage.cs | 3 + .../IEventMessageHandler.cs | 6 ++ src/Arcturus.EventBus.Abstracts/IProcessor.cs | 17 ++++ src/Arcturus.EventBus.Abstracts/IPublisher.cs | 15 ++++ .../ISubscriber.cs | 8 ++ .../OnProcessEventArgs.cs | 26 +++++++ .../Arcturus.EventBus.RabbitMQ.csproj | 20 +++++ .../Internals/EventMessageConverter.cs | 78 +++++++++++++++++++ .../Internals/EventMessageSerializer.cs | 19 +++++ .../RabbitMQConnection.cs | 53 +++++++++++++ .../RabbitMQEventBusFactory.cs | 20 +++++ .../RabbitMQEventBusOptions.cs | 17 ++++ .../RabbitMQProcessor.cs | 65 ++++++++++++++++ .../RabbitMQPublisher.cs | 59 ++++++++++++++ .../ServiceExtensions.cs | 24 ++++++ .../Arcturus.EventBus.csproj | 18 +++++ src/Arcturus.EventBus/Threading/AsyncLock.cs | 27 +++++++ 21 files changed, 523 insertions(+) create mode 100644 src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj create mode 100644 src/Arcturus.EventBus.Abstracts/IConnection.cs create mode 100644 src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs create mode 100644 src/Arcturus.EventBus.Abstracts/IEventMessage.cs create mode 100644 src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs create mode 100644 src/Arcturus.EventBus.Abstracts/IProcessor.cs create mode 100644 src/Arcturus.EventBus.Abstracts/IPublisher.cs create mode 100644 src/Arcturus.EventBus.Abstracts/ISubscriber.cs create mode 100644 src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj create mode 100644 src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs create mode 100644 src/Arcturus.EventBus/Arcturus.EventBus.csproj create mode 100644 src/Arcturus.EventBus/Threading/AsyncLock.cs diff --git a/Arcturus.sln b/Arcturus.sln index 8526d9f..7f9b09b 100644 --- a/Arcturus.sln +++ b/Arcturus.sln @@ -15,6 +15,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcturus.Extensions.ResultO EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.AspNetCore.Endpoints", "src\Arcturus.AspNetCore.Endpoints\Arcturus.AspNetCore.Endpoints.csproj", "{2CF44FC8-346B-44A9-9B80-2D7A16E2B474}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.Abstracts", "src\Arcturus.EventBus.Abstracts\Arcturus.EventBus.Abstracts.csproj", "{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus", "src\Arcturus.EventBus\Arcturus.EventBus.csproj", "{ED23F692-87DC-4753-A67B-D6F94F3BBA00}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.RabbitMQ", "src\Arcturus.EventBus.RabbitMQ\Arcturus.EventBus.RabbitMQ.csproj", "{86F28A25-A376-411F-B67B-FB6FC3742566}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -33,6 +39,18 @@ Global {2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Debug|Any CPU.Build.0 = Debug|Any CPU {2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Release|Any CPU.ActiveCfg = Release|Any CPU {2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Release|Any CPU.Build.0 = Release|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Release|Any CPU.Build.0 = Release|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Release|Any CPU.Build.0 = Release|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.Build.0 = Debug|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.ActiveCfg = Release|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj b/src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj new file mode 100644 index 0000000..a84b2ef --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj @@ -0,0 +1,9 @@ + + + + Arcturus.EventBus.Abstracts + MIT + eventbus,events,arcturus + + + diff --git a/src/Arcturus.EventBus.Abstracts/IConnection.cs b/src/Arcturus.EventBus.Abstracts/IConnection.cs new file mode 100644 index 0000000..5c5e7e1 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IConnection.cs @@ -0,0 +1,13 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IConnection +{ + /// + /// Gets the name of the application. + /// + string? ApplicationId { get; } + /// + /// Gets a value indicating if the connection is connected. + /// + bool IsConnected { get; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs b/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs new file mode 100644 index 0000000..abc014b --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs @@ -0,0 +1,8 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IEventBusFactory +{ + IProcessor CreateProcessor(); + IPublisher CreatePublisher(); + ISubscriber CreateSubscriber(); +} diff --git a/src/Arcturus.EventBus.Abstracts/IEventMessage.cs b/src/Arcturus.EventBus.Abstracts/IEventMessage.cs new file mode 100644 index 0000000..170edb0 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IEventMessage.cs @@ -0,0 +1,3 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IEventMessage { } \ No newline at end of file diff --git a/src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs b/src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs new file mode 100644 index 0000000..3aa3c11 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs @@ -0,0 +1,6 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IEventMessageHandler where TEvent : IEventMessage +{ + Task Handle(TEvent @event, CancellationToken cancellationToken = default); +} diff --git a/src/Arcturus.EventBus.Abstracts/IProcessor.cs b/src/Arcturus.EventBus.Abstracts/IProcessor.cs new file mode 100644 index 0000000..b4ee196 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IProcessor.cs @@ -0,0 +1,17 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IProcessor +{ + /// + /// Event triggered when a message is processed asynchronously. + /// + event Func OnProcessAsync; + + /// + /// Waits for events to be processed. + /// + /// Token to monitor for cancellation requests. + /// A task that represents the asynchronous operation. + Task WaitForEvents( + CancellationToken cancellationToken = default); +} diff --git a/src/Arcturus.EventBus.Abstracts/IPublisher.cs b/src/Arcturus.EventBus.Abstracts/IPublisher.cs new file mode 100644 index 0000000..23a2c76 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IPublisher.cs @@ -0,0 +1,15 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IPublisher +{ + /// + /// + /// + /// + /// + /// + /// + Task Publish( + TEvent @event + , CancellationToken cancellationToken = default) where TEvent : IEventMessage; +} diff --git a/src/Arcturus.EventBus.Abstracts/ISubscriber.cs b/src/Arcturus.EventBus.Abstracts/ISubscriber.cs new file mode 100644 index 0000000..6c7e3c9 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/ISubscriber.cs @@ -0,0 +1,8 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface ISubscriber +{ + Task Subscribe( + Func handler + , CancellationToken cancellationToken = default) where TEvent : IEventMessage; +} diff --git a/src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs b/src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs new file mode 100644 index 0000000..debffed --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs @@ -0,0 +1,26 @@ +namespace Arcturus.EventBus.Abstracts; + +public sealed class OnProcessEventArgs : EventArgs +{ + public OnProcessEventArgs( + string? messageId + , IDictionary? headers + , CancellationToken cancellationToken = default) + { + MessageId = messageId; + Headers = headers; + CancellationToken = cancellationToken; + } + /// + /// Gets a dictionary of headers or null. + /// + public IDictionary? Headers { get; } + /// + /// Gets an id of the message or null. + /// + public string? MessageId { get; } + /// + /// Gets a cancellation token or null. + /// + public CancellationToken? CancellationToken { get; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj b/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj new file mode 100644 index 0000000..f7e7929 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj @@ -0,0 +1,20 @@ + + + + Arcturus.EventBus.RabbitMQ + MIT + eventbus,events,arcturus,rabbitmq + + + + + + + + + + + + + + diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs new file mode 100644 index 0000000..5992de1 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs @@ -0,0 +1,78 @@ +using Arcturus.EventBus.Abstracts; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Arcturus.EventBus.RabbitMQ.Internals; + +internal sealed class EventMessageConverter : JsonConverter +{ + private const string DiscriminatorPropertyName = "$eventType"; + public override IEventMessage Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + using var document = JsonDocument.ParseValue(ref reader); + var root = document.RootElement; + + // Get the discriminator + if (!root.TryGetProperty(DiscriminatorPropertyName, out var typeProperty)) + { + throw new JsonException($"Missing discriminator property '{DiscriminatorPropertyName}'."); + } + + var typeName = typeProperty.GetString(); + if (typeName == null) + { + throw new JsonException("Discriminator property value is null."); + } + + // Resolve the type and deserialize + // var type = Type.GetType(typeName); + + var type = AppDomain.CurrentDomain + .GetAssemblies() + .Select(a => a.GetType(typeName)) + .Where(_ => _ is not null) + .FirstOrDefault(); + + if (type == null) + { + throw new JsonException($"Cannot resolve type '{typeName}'."); + } + + var json = root.GetRawText(); + return (IEventMessage)JsonSerializer.Deserialize(json, type, options)!; + } + + public override void Write(Utf8JsonWriter writer, IEventMessage value, JsonSerializerOptions options) + { + if (value == null) + { + writer.WriteNullValue(); + return; + }// Get the actual type of the object + var type = value.GetType(); + writer.WriteStartObject(); + + // Write the discriminator + writer.WriteString(DiscriminatorPropertyName, type.FullName); + + // Serialize the object properties + var properties = type.GetProperties(); + foreach (var property in properties) + { + var propertyValue = property.GetValue(value); + if (propertyValue != null) + { + writer.WritePropertyName(property.Name); + JsonSerializer.Serialize(writer, propertyValue, propertyValue?.GetType() ?? typeof(object), options); + } + else if (propertyValue == null && + options.DefaultIgnoreCondition != JsonIgnoreCondition.WhenWritingNull) + { + writer.WritePropertyName(property.Name); + writer.WriteNullValue(); + } + } + + writer.WriteEndObject(); + } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs new file mode 100644 index 0000000..fd06154 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs @@ -0,0 +1,19 @@ +using Arcturus.EventBus.Abstracts; +using System.Text.Json; + +namespace Arcturus.EventBus.RabbitMQ.Internals; + +internal static class EventMessageSerializer +{ + private static JsonSerializerOptions _options = new() + { + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull + , WriteIndented = false + , Converters = { new EventMessageConverter() } + }; + + internal static string Serialize(IEventMessage @event) + => JsonSerializer.Serialize(@event, _options); + internal static IEventMessage Deserialize(string message) + => JsonSerializer.Deserialize(message, _options)!; +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs new file mode 100644 index 0000000..6fd41dc --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs @@ -0,0 +1,53 @@ +using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.Threading; +using RMQ = RabbitMQ.Client; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQConnection : IConnection +{ + private string? _applicationId { get; } + private string _clientName { get; } + private string _connectionHostName { get; } + + private RMQ.IChannel? _channel; + private RMQ.IConnection? _connection; + private bool _isConnected = false; + public string? ApplicationId => _applicationId; + public bool IsConnected => _isConnected; + + private readonly AsyncLock _asyncLock = new(); + + internal RMQ.IChannel Channel => _channel!; + + internal RabbitMQConnection(RabbitMQEventBusOptions options) + { + _applicationId = options.ApplicationId; + _clientName = options.ClientName ?? Environment.MachineName; + _connectionHostName = options.HostName ?? "localhost"; + } + + internal async Task Connect(CancellationToken cancellationToken = default) + { + var factory = new RMQ.ConnectionFactory { HostName = _connectionHostName }; + + using (await _asyncLock.LockAsync(cancellationToken)) + { + _connection = await factory.CreateConnectionAsync(_clientName, cancellationToken); + _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + + _isConnected = true; + } + } + internal Task EnsureConnected(CancellationToken cancellationToken = default) + { + if (!IsConnected) + return Connect(cancellationToken); + return Task.CompletedTask; + } + public void Dispose() + { + _channel?.Dispose(); + _connection?.Dispose(); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs new file mode 100644 index 0000000..bc55002 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs @@ -0,0 +1,20 @@ +using Arcturus.EventBus.Abstracts; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQEventBusFactory( + IConnection connection) : IEventBusFactory +{ + public IPublisher CreatePublisher() + { + return new RabbitMQPublisher(connection); + } + public IProcessor CreateProcessor() + { + return new RabbitMQProcessor(connection); + } + public ISubscriber CreateSubscriber() + { + throw new NotImplementedException(); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs new file mode 100644 index 0000000..c119c7b --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs @@ -0,0 +1,17 @@ +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQEventBusOptions +{ + /// + /// Gets or sets an application id. + /// + public string? ApplicationId { get; set; } + /// + /// Gets or sets a client name. + /// + public string? ClientName { get; set; } + /// + /// Gets or sets a host name. Defaults to localhost. + /// + public string? HostName { get; set; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs new file mode 100644 index 0000000..34ebcaf --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs @@ -0,0 +1,65 @@ +using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.RabbitMQ.Internals; +using RabbitMQ.Client.Events; +using RabbitMQ.Client; +using System.Text; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQProcessor : IProcessor +{ + public event Func? OnProcessAsync; + private readonly RabbitMQConnection _connection; + private readonly string queueName = "task_queue"; + + internal RabbitMQProcessor(Abstracts.IConnection connection) + { + if (connection is not RabbitMQConnection) + throw new NotImplementedException(); + + _connection = (RabbitMQConnection)connection; + } + + public async Task WaitForEvents(CancellationToken cancellationToken = default) + { + await _connection.EnsureConnected(cancellationToken); + + // consumer + await _connection.Channel.QueueDeclareAsync( + queue: queueName + , durable: true + , exclusive: false + , autoDelete: false + , arguments: null + , cancellationToken: cancellationToken); + + await _connection.Channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken); + // Console.WriteLine(" [*] Waiting for messages."); + + var consumer = new AsyncEventingBasicConsumer(_connection.Channel); + consumer.ReceivedAsync += async (model, ea) => + { + byte[] body = ea.Body.ToArray(); + var message = Encoding.UTF8.GetString(body); + + var @event = EventMessageSerializer.Deserialize(message); + + if (OnProcessAsync is not null) + await OnProcessAsync.Invoke( + @event + , new OnProcessEventArgs( + ea.BasicProperties.MessageId + , ea.BasicProperties.Headers + , ea.CancellationToken)); + + await _connection.Channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false, cancellationToken); + }; + + await _connection.Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer, cancellationToken); + // await _connection.Channel.BasicConsumeAsync(queueName, false, null, false, false, null, consumer, cancellationToken); + while (!cancellationToken.IsCancellationRequested) + { + cancellationToken.WaitHandle.WaitOne(100); + } + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs new file mode 100644 index 0000000..8d91b02 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs @@ -0,0 +1,59 @@ +using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.RabbitMQ.Internals; +using Polly; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using System.Net.Sockets; +using System.Text; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQPublisher : IPublisher +{ + private readonly RabbitMQConnection _connection; + private readonly string queueName = "task_queue"; + internal RabbitMQPublisher(Abstracts.IConnection connection) + { + if (connection is not RabbitMQConnection) + throw new NotImplementedException(); + + _connection = (RabbitMQConnection)connection; + } + + public async Task Publish( + TEvent @event + , CancellationToken cancellationToken = default) where TEvent : IEventMessage + { + // retry policy + var policy = Policy + .Handle() + .Or() + .Or() + .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (exception, timeSpan) => { + // _logger.LogWarning(exception, "Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}.", @event.Id, $"{timeSpan.TotalSeconds:n1}", exception.Message); + }); + + await policy.Execute(async () => { + await _connection.EnsureConnected(cancellationToken); + + await _connection.Channel.QueueDeclareAsync( + queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null, cancellationToken: cancellationToken); + + var message = EventMessageSerializer.Serialize(@event); + var body = Encoding.UTF8.GetBytes(message); + + var properties = new BasicProperties + { + Persistent = true + , AppId = _connection.ApplicationId + , CorrelationId = null + , Headers = null + , MessageId = Guid.NewGuid().ToString() + }; + + await _connection.Channel.BasicPublishAsync( + exchange: string.Empty, routingKey: queueName, mandatory: true, basicProperties: properties, body: body); + }); + + } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs b/src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs new file mode 100644 index 0000000..6b33149 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs @@ -0,0 +1,24 @@ +using Arcturus.EventBus.Abstracts; +using Microsoft.Extensions.DependencyInjection; + +namespace Arcturus.EventBus.RabbitMQ; + +public static class ServicesExtensions +{ + public static IServiceCollection AddRabbitMQEventBus( + this IServiceCollection services + , Action? options = null) + { + var currentOptions = new RabbitMQEventBusOptions(); + if (options is not null) + { + options(currentOptions); + } + + services.AddSingleton( + (sp) => { return new RabbitMQConnection(currentOptions); }); + services.AddSingleton(); + + return services; + } +} diff --git a/src/Arcturus.EventBus/Arcturus.EventBus.csproj b/src/Arcturus.EventBus/Arcturus.EventBus.csproj new file mode 100644 index 0000000..5adda9b --- /dev/null +++ b/src/Arcturus.EventBus/Arcturus.EventBus.csproj @@ -0,0 +1,18 @@ + + + + Arcturus.EventBus + MIT + eventbus,events,arcturus + + + + + + + + + + + + diff --git a/src/Arcturus.EventBus/Threading/AsyncLock.cs b/src/Arcturus.EventBus/Threading/AsyncLock.cs new file mode 100644 index 0000000..3bb9e26 --- /dev/null +++ b/src/Arcturus.EventBus/Threading/AsyncLock.cs @@ -0,0 +1,27 @@ +namespace Arcturus.EventBus.Threading; + +public sealed class AsyncLock +{ + private readonly SemaphoreSlim _semaphore = new(1, 1); + + public async Task LockAsync(CancellationToken cancellationToken = default) + { + await _semaphore.WaitAsync(cancellationToken); + return new Releaser(_semaphore); + } + + private sealed class Releaser : IDisposable + { + private readonly SemaphoreSlim _semaphore; + + internal Releaser(SemaphoreSlim semaphore) + { + _semaphore = semaphore; + } + + public void Dispose() + { + _semaphore.Release(); + } + } +} From d25de46a838065af3bd16e4a6f97660998ffa649 Mon Sep 17 00:00:00 2001 From: Cloudfy Date: Mon, 2 Dec 2024 21:25:02 +0100 Subject: [PATCH 2/3] Add optional queue names and type resolver improvements Updated IEventBusFactory to allow optional queue names for CreateProcessor and CreatePublisher methods. Introduced DefaultEventMessageTypeResolver for type resolution and caching. Updated EventMessageConverter to use the new resolver. Added event handler in RabbitMQConnection to set _isConnected to false on shutdown. Enhanced RabbitMQEventBusFactory, RabbitMQProcessor, and RabbitMQPublisher to support optional queue names, replacing hardcoded names with a default fallback. --- .../IEventBusFactory.cs | 4 +-- .../DefaultEventMessageTypeResolver.cs | 34 +++++++++++++++++++ .../Internals/EventMessageConverter.cs | 29 +++++----------- .../RabbitMQConnection.cs | 6 +++- .../RabbitMQEventBusFactory.cs | 8 ++--- .../RabbitMQProcessor.cs | 9 ++--- .../RabbitMQPublisher.cs | 12 ++++--- 7 files changed, 66 insertions(+), 36 deletions(-) create mode 100644 src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs diff --git a/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs b/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs index abc014b..bb854ed 100644 --- a/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs +++ b/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs @@ -2,7 +2,7 @@ public interface IEventBusFactory { - IProcessor CreateProcessor(); - IPublisher CreatePublisher(); + IProcessor CreateProcessor(string? queue = null); + IPublisher CreatePublisher(string? queue = null); ISubscriber CreateSubscriber(); } diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs new file mode 100644 index 0000000..cf04e0e --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs @@ -0,0 +1,34 @@ +namespace Arcturus.EventBus.RabbitMQ.Internals; + +/// +/// Default type resolver using the AppDomain. +/// +internal class DefaultEventMessageTypeResolver +{ + private readonly Dictionary _reflectionCache = []; + + /// + /// Resolves to a . + /// + /// Required. Name of type to resolve. + /// or null. + internal Type? ResolveType(string typeName) + { + ArgumentNullException.ThrowIfNull(typeName, nameof(typeName)); + + if (_reflectionCache.TryGetValue(typeName, out var type)) + return type; + + type = AppDomain.CurrentDomain + .GetAssemblies() + .Select(a => a.GetType(typeName)) + .Where(_ => _ is not null) + .FirstOrDefault(); + if (type is not null) + { + _reflectionCache.Add(typeName, type); + return type; + } + return null; + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs index 5992de1..53f760e 100644 --- a/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs @@ -7,6 +7,13 @@ namespace Arcturus.EventBus.RabbitMQ.Internals; internal sealed class EventMessageConverter : JsonConverter { private const string DiscriminatorPropertyName = "$eventType"; + private readonly DefaultEventMessageTypeResolver _typeResolver; + + internal EventMessageConverter() + { + _typeResolver = new DefaultEventMessageTypeResolver(); + } + public override IEventMessage Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { using var document = JsonDocument.ParseValue(ref reader); @@ -18,26 +25,8 @@ public override IEventMessage Read(ref Utf8JsonReader reader, Type typeToConvert throw new JsonException($"Missing discriminator property '{DiscriminatorPropertyName}'."); } - var typeName = typeProperty.GetString(); - if (typeName == null) - { - throw new JsonException("Discriminator property value is null."); - } - - // Resolve the type and deserialize - // var type = Type.GetType(typeName); - - var type = AppDomain.CurrentDomain - .GetAssemblies() - .Select(a => a.GetType(typeName)) - .Where(_ => _ is not null) - .FirstOrDefault(); - - if (type == null) - { - throw new JsonException($"Cannot resolve type '{typeName}'."); - } - + var typeName = typeProperty.GetString() ?? throw new JsonException("Discriminator property value is null."); + var type = _typeResolver.ResolveType(typeName) ?? throw new JsonException($"Cannot resolve type '{typeName}'."); var json = root.GetRawText(); return (IEventMessage)JsonSerializer.Deserialize(json, type, options)!; } diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs index 6fd41dc..18f6a11 100644 --- a/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs @@ -9,7 +9,6 @@ public sealed class RabbitMQConnection : IConnection private string? _applicationId { get; } private string _clientName { get; } private string _connectionHostName { get; } - private RMQ.IChannel? _channel; private RMQ.IConnection? _connection; private bool _isConnected = false; @@ -34,6 +33,11 @@ internal async Task Connect(CancellationToken cancellationToken = default) using (await _asyncLock.LockAsync(cancellationToken)) { _connection = await factory.CreateConnectionAsync(_clientName, cancellationToken); + _connection.ConnectionShutdownAsync += (sender, args) => + { + _isConnected = false; + return Task.CompletedTask; + }; _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); _isConnected = true; diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs index bc55002..d1e1c4b 100644 --- a/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs @@ -5,13 +5,13 @@ namespace Arcturus.EventBus.RabbitMQ; public sealed class RabbitMQEventBusFactory( IConnection connection) : IEventBusFactory { - public IPublisher CreatePublisher() + public IPublisher CreatePublisher(string? queue = null) { - return new RabbitMQPublisher(connection); + return new RabbitMQPublisher(connection, queue); } - public IProcessor CreateProcessor() + public IProcessor CreateProcessor(string? queue = null) { - return new RabbitMQProcessor(connection); + return new RabbitMQProcessor(connection, queue); } public ISubscriber CreateSubscriber() { diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs index 34ebcaf..53a7143 100644 --- a/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs @@ -10,14 +10,15 @@ public sealed class RabbitMQProcessor : IProcessor { public event Func? OnProcessAsync; private readonly RabbitMQConnection _connection; - private readonly string queueName = "task_queue"; + private readonly string _queueName; - internal RabbitMQProcessor(Abstracts.IConnection connection) + internal RabbitMQProcessor(Abstracts.IConnection connection, string? queueName = null) { if (connection is not RabbitMQConnection) throw new NotImplementedException(); _connection = (RabbitMQConnection)connection; + _queueName = queueName ?? "default_queue"; } public async Task WaitForEvents(CancellationToken cancellationToken = default) @@ -26,7 +27,7 @@ public async Task WaitForEvents(CancellationToken cancellationToken = default) // consumer await _connection.Channel.QueueDeclareAsync( - queue: queueName + queue: _queueName , durable: true , exclusive: false , autoDelete: false @@ -55,7 +56,7 @@ await OnProcessAsync.Invoke( await _connection.Channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false, cancellationToken); }; - await _connection.Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer, cancellationToken); + await _connection.Channel.BasicConsumeAsync(_queueName, autoAck: false, consumer: consumer, cancellationToken); // await _connection.Channel.BasicConsumeAsync(queueName, false, null, false, false, null, consumer, cancellationToken); while (!cancellationToken.IsCancellationRequested) { diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs index 8d91b02..26b5a68 100644 --- a/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs @@ -11,13 +11,15 @@ namespace Arcturus.EventBus.RabbitMQ; public sealed class RabbitMQPublisher : IPublisher { private readonly RabbitMQConnection _connection; - private readonly string queueName = "task_queue"; - internal RabbitMQPublisher(Abstracts.IConnection connection) + private readonly string _queueName; + + internal RabbitMQPublisher(Abstracts.IConnection connection, string? queueName = null) { if (connection is not RabbitMQConnection) - throw new NotImplementedException(); + throw new NotImplementedException($"Requires RabbitMQConnection"); _connection = (RabbitMQConnection)connection; + _queueName = queueName ?? "default_queue"; } public async Task Publish( @@ -37,7 +39,7 @@ await policy.Execute(async () => { await _connection.EnsureConnected(cancellationToken); await _connection.Channel.QueueDeclareAsync( - queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null, cancellationToken: cancellationToken); + queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null, cancellationToken: cancellationToken); var message = EventMessageSerializer.Serialize(@event); var body = Encoding.UTF8.GetBytes(message); @@ -52,7 +54,7 @@ await _connection.Channel.QueueDeclareAsync( }; await _connection.Channel.BasicPublishAsync( - exchange: string.Empty, routingKey: queueName, mandatory: true, basicProperties: properties, body: body); + exchange: string.Empty, routingKey: _queueName, mandatory: true, basicProperties: properties, body: body); }); } From c6affb814b511f47f833c497d08e1f1e4b6c795e Mon Sep 17 00:00:00 2001 From: Cloudfy Date: Tue, 3 Dec 2024 14:04:16 +0100 Subject: [PATCH 3/3] Add OpenTelemetry support to EventBus projects Added new Arcturus.EventBus.OpenTelemetry project with necessary references. Updated Arcturus.sln to include the new project. Enhanced README.md with descriptions for EventBus projects. Integrated OpenTelemetry instrumentation into Arcturus.EventBus and Arcturus.EventBus.RabbitMQ. Modified RabbitMQPublisher for tracing. Added EventBusActivitySource for activity creation and context propagation. Introduced IDiagnosticProperties interface. Updated global usings for diagnostics. Added OpenTelemetry package to Arcturus.EventBus.RabbitMQ.csproj. Implemented context extraction and injection methods. --- Arcturus.sln | 6 + docs/README.md | 3 + .../Arcturus.EventBus.OpenTelemetry.csproj | 17 ++ .../TraceProviderBuilderExtensions.cs | 57 +++++++ .../Arcturus.EventBus.RabbitMQ.csproj | 1 + .../OpenTelemetry/OpenTelemetryExtensions.cs | 56 +++++++ .../RabbitMQPublisher.cs | 9 +- src/Arcturus.EventBus.RabbitMQ/Usings.cs | 2 + .../Diagnostics/EventBusActivitySource.cs | 153 ++++++++++++++++++ .../Diagnostics/IDiagnosticProperties.cs | 9 ++ src/Arcturus.EventBus/Usings.cs | 3 + 11 files changed, 314 insertions(+), 2 deletions(-) create mode 100644 src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj create mode 100644 src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs create mode 100644 src/Arcturus.EventBus.RabbitMQ/Usings.cs create mode 100644 src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs create mode 100644 src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs create mode 100644 src/Arcturus.EventBus/Usings.cs diff --git a/Arcturus.sln b/Arcturus.sln index 7f9b09b..d72824a 100644 --- a/Arcturus.sln +++ b/Arcturus.sln @@ -21,6 +21,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus", "src\Ar EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.RabbitMQ", "src\Arcturus.EventBus.RabbitMQ\Arcturus.EventBus.RabbitMQ.csproj", "{86F28A25-A376-411F-B67B-FB6FC3742566}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.OpenTelemetry", "src\Arcturus.EventBus.OpenTelemetry\Arcturus.EventBus.OpenTelemetry.csproj", "{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -51,6 +53,10 @@ Global {86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.Build.0 = Debug|Any CPU {86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.ActiveCfg = Release|Any CPU {86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.Build.0 = Release|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/docs/README.md b/docs/README.md index a31b271..0b9af12 100644 --- a/docs/README.md +++ b/docs/README.md @@ -10,6 +10,9 @@ Arcturus is a ready-to-use framework designed for building modern cloud and dist | Arcturus.ResultObjects | Result objects for not relying on exceptions as code flow. | Arcturus.ResultObjects | | Arcturus.Extensions.ResultObjects.AspNetCore | Extensions for using Result objects in ASP.NET Core. | Arcturus.Extensions.ResultObjects.AspNetCore | | Arcturus.AspNetCore.Endpoints | Enable quick and easy to use API endpoints. | Arcturus.AspNetCore.Endpoints | +| Arcturus.EventBus | Event bus for publish/subscribe pattern. | Arcturus.EventBus | +| Arcturus.EventBus.RabbitMQ | RabbitMQ implementation for Arcturus.EventBus. | Arcturus.EventBus.RabbitMQ | +| Arcturus.EventBus.Abstracts | Abstracts for EventBus implementation | Arcturus.EventBus.Abstracts | ## Installation Using .NET CLI diff --git a/src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj b/src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj new file mode 100644 index 0000000..d06a8d0 --- /dev/null +++ b/src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj @@ -0,0 +1,17 @@ + + + + Arcturus.EventBus.OpenTelemetry + MIT + eventbus,events,arcturus,opentelemetry + + + + + + + + + + + diff --git a/src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs b/src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs new file mode 100644 index 0000000..5865ffb --- /dev/null +++ b/src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs @@ -0,0 +1,57 @@ +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; +using OpenTelemetry; +using System.Diagnostics; +using System.Text; +using Arcturus.EventBus.Diagnostics; + +namespace Arcturus.EventBus.OpenTelemetry; + +public static class TraceProviderBuilderExtensions +{ + public static TracerProviderBuilder AddEventBusInstrumentation(this TracerProviderBuilder builder) + { + EventBusActivitySource.ContextExtractor = OpenTelemetryContextExtractor; + EventBusActivitySource.ContextInjector = OpenTelemetryContextInjector; + builder.AddSource(EventBusActivitySource.ActivityName); + return builder; + } + + private static ActivityContext OpenTelemetryContextExtractor(IDiagnosticProperties props) + { + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + Baggage.Current = parentContext.Baggage; + return parentContext.ActivityContext; + } + + private static IEnumerable OpenTelemetryContextGetter(IDictionary? carrier, string key) + { + try + { + if (carrier is not null && carrier.TryGetValue(key, out object? value) && value is byte[] bytes) + { + return [Encoding.UTF8.GetString(bytes)]; + } + } + catch (Exception) + { + //this.logger.LogError(ex, "Failed to extract trace context."); + } + + return []; + } + + private static void OpenTelemetryContextInjector(Activity activity, IDictionary? props) + { + // Inject the current Activity's context into the message headers. + Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); + } + + private static void OpenTelemetryContextSetter(IDictionary? carrier, string key, string value) + { + if (carrier is null) + return; + carrier[key] = Encoding.UTF8.GetBytes(value); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj b/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj index f7e7929..dccf0b6 100644 --- a/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj +++ b/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj @@ -10,6 +10,7 @@ + diff --git a/src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs b/src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs new file mode 100644 index 0000000..c9c5cb4 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs @@ -0,0 +1,56 @@ +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; +using RabbitMQ.Client; +using System.Text; + +namespace Arcturus.EventBus.RabbitMQ.OpenTelemetry; + +public static class OpenTelemetryExtensions +{ + public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder) + { + RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor; + RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector; + builder.AddSource("RabbitMQ.Client.*"); + return builder; + } + + private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props) + { + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + Baggage.Current = parentContext.Baggage; + return parentContext.ActivityContext; + } + + private static IEnumerable OpenTelemetryContextGetter(IDictionary? carrier, string key) + { + try + { + if (carrier is not null && carrier.TryGetValue(key, out object? value) && value is byte[] bytes) + { + return [Encoding.UTF8.GetString(bytes)]; + } + } + catch (Exception) + { + //this.logger.LogError(ex, "Failed to extract trace context."); + } + + return []; + } + + private static void OpenTelemetryContextInjector(Activity activity, IDictionary? props) + { + // Inject the current Activity's context into the message headers. + Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); + } + + private static void OpenTelemetryContextSetter(IDictionary? carrier, string key, string value) + { + if (carrier is null) + return; + carrier[key] = Encoding.UTF8.GetBytes(value); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs index 26b5a68..15ce8fd 100644 --- a/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs @@ -1,4 +1,5 @@ using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.Diagnostics; using Arcturus.EventBus.RabbitMQ.Internals; using Polly; using RabbitMQ.Client; @@ -21,7 +22,7 @@ internal RabbitMQPublisher(Abstracts.IConnection connection, string? queueName = _connection = (RabbitMQConnection)connection; _queueName = queueName ?? "default_queue"; } - + public async Task Publish( TEvent @event , CancellationToken cancellationToken = default) where TEvent : IEventMessage @@ -44,6 +45,11 @@ await _connection.Channel.QueueDeclareAsync( var message = EventMessageSerializer.Serialize(@event); var body = Encoding.UTF8.GetBytes(message); + using Activity? sendActivity = EventBusActivitySource.PublisherHasListeners + ? EventBusActivitySource.Publish(@event.GetType().Name) + : default; + //sendActivity?.SetTag("eventbus.message.name", @event.GetType().Name); + var properties = new BasicProperties { Persistent = true @@ -56,6 +62,5 @@ await _connection.Channel.QueueDeclareAsync( await _connection.Channel.BasicPublishAsync( exchange: string.Empty, routingKey: _queueName, mandatory: true, basicProperties: properties, body: body); }); - } } \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/Usings.cs b/src/Arcturus.EventBus.RabbitMQ/Usings.cs new file mode 100644 index 0000000..a335706 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Usings.cs @@ -0,0 +1,2 @@ +global using System.Diagnostics; +global using System.Reflection; diff --git a/src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs b/src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs new file mode 100644 index 0000000..c9919b1 --- /dev/null +++ b/src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs @@ -0,0 +1,153 @@ +namespace Arcturus.EventBus.Diagnostics; + +public static class EventBusActivitySource +{ + private static readonly string AssemblyVersion = typeof(EventBusActivitySource).Assembly + .GetCustomAttribute() + ?.InformationalVersion ?? ""; + private static readonly ActivitySource _publisherSource = new (PublisherSourceName, AssemblyVersion); + private static readonly ActivitySource _subscriberSource = new (SubscriberSourceName, AssemblyVersion); + + internal const string MessageId = "eventbus.message.id"; + internal const string MessageConversationId = "eventbus.message.conversation_id"; + internal const string MessagingSystem = "EventBus"; + internal const string MessagingOperationTypePublish = "publish"; + internal const string MessagingOperationTypeProcess = "process"; + internal const string MessagingOperationTypeReceive = "receive"; + internal const string MessagingOperationType = "eventbus.message.operation"; + + public static bool PublisherHasListeners => _publisherSource.HasListeners(); + + internal static readonly IEnumerable> CreationTags = + [ + new KeyValuePair(MessagingSystem, "rabbitmq") + ]; + + public static Activity? Publish(string eventMessageName, ActivityContext linkedContext = default) + { + if (!_publisherSource.HasListeners()) + { + return null; + } + Activity? activity = linkedContext == default + ? _publisherSource.StartEventBusActivity( + MessagingOperationTypePublish, // UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, + ActivityKind.Producer) + : _publisherSource.StartLinkedEventBusActivity( + MessagingOperationTypePublish, // UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, + ActivityKind.Producer, linkedContext); + if (activity != null && activity.IsAllDataRequested) + { + PopulateMessagingTags(MessagingOperationTypePublish, activity); + } + + return activity; + + } + + private static Activity? StartEventBusActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext parentContext = default) + { + return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start(); + } + + private static Activity? StartLinkedEventBusActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext linkedContext = default, ActivityContext parentContext = default) + { + return source.CreateActivity(name, kind, parentContext: parentContext, + links: new[] { new ActivityLink(linkedContext) }, idFormat: ActivityIdFormat.W3C, + tags: CreationTags) + ?.Start(); + } + + public const string PublisherSourceName = "Arcturus.EventBus.Publisher"; + public const string SubscriberSourceName = "Arcturus.EventBus.Subscriber"; + public const string ActivityName = "Arcturus.EventBus.*"; + + public static Action> ContextInjector { get; set; } = DefaultContextInjector; + public static Func ContextExtractor { get; set; } = DefaultContextExtractor; + + //private static void PopulateMessagingTags(string operation, Activity activity) + //{ + // PopulateMessagingTags(operation, activity); + + // if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId)) + // { + // activity.SetTag(MessageConversationId, readOnlyBasicProperties.CorrelationId); + // } + + // if (!string.IsNullOrEmpty(readOnlyBasicProperties.MessageId)) + // { + // activity.SetTag(MessageId, readOnlyBasicProperties.MessageId); + // } + //} + private static void PopulateMessagingTags(string operation, Activity activity) + { + activity + .SetTag(MessagingOperationType, operation); + //.SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange) + //.SetTag(MessagingDestinationRoutingKey, routingKey) + //.SetTag(MessagingBodySize, bodySize); + + //if (deliveryTag > 0) + //{ + // activity.SetTag(RabbitMQDeliveryTag, deliveryTag); + //} + } + + private static void DefaultContextInjector(Activity sendActivity, IDictionary props) + { + DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter); + } + private static ActivityContext DefaultContextExtractor(IDiagnosticProperties props) + { + if (props.Headers == null) + { + return default; + } + + bool hasHeaders = false; + foreach (string header in DistributedContextPropagator.Current.Fields) + { + if (props.Headers.ContainsKey(header)) + { + hasHeaders = true; + break; + } + } + + + if (!hasHeaders) + { + return default; + } + + DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string? traceParent, out string? traceState); + return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default; + } + + private static void DefaultContextSetter(object? carrier, string name, string value) + { + if (!(carrier is IDictionary carrierDictionary)) + { + return; + } + + // Only propagate headers if they haven't already been set + carrierDictionary[name] = value; + } + private static void DefaultContextGetter(object? carrier, string name, out string? value, out IEnumerable? values) + { + if (carrier is IDictionary carrierDict && + carrierDict.TryGetValue(name, out object? propsVal) && propsVal is byte[] bytes) + { + value = Encoding.UTF8.GetString(bytes); + values = default; + } + else + { + value = default; + values = default; + } + } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs b/src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs new file mode 100644 index 0000000..a4618d0 --- /dev/null +++ b/src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs @@ -0,0 +1,9 @@ +namespace Arcturus.EventBus.Diagnostics; + +public interface IDiagnosticProperties +{ + string? AppId { get; } + string? MessageId { get; } + string? CorrelationId { get; } + IDictionary? Headers { get; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus/Usings.cs b/src/Arcturus.EventBus/Usings.cs new file mode 100644 index 0000000..51519b3 --- /dev/null +++ b/src/Arcturus.EventBus/Usings.cs @@ -0,0 +1,3 @@ +global using System.Diagnostics; +global using System.Reflection; +global using System.Text; \ No newline at end of file