diff --git a/Arcturus.sln b/Arcturus.sln index 8526d9f..d72824a 100644 --- a/Arcturus.sln +++ b/Arcturus.sln @@ -15,6 +15,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcturus.Extensions.ResultO EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.AspNetCore.Endpoints", "src\Arcturus.AspNetCore.Endpoints\Arcturus.AspNetCore.Endpoints.csproj", "{2CF44FC8-346B-44A9-9B80-2D7A16E2B474}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.Abstracts", "src\Arcturus.EventBus.Abstracts\Arcturus.EventBus.Abstracts.csproj", "{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus", "src\Arcturus.EventBus\Arcturus.EventBus.csproj", "{ED23F692-87DC-4753-A67B-D6F94F3BBA00}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.RabbitMQ", "src\Arcturus.EventBus.RabbitMQ\Arcturus.EventBus.RabbitMQ.csproj", "{86F28A25-A376-411F-B67B-FB6FC3742566}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.OpenTelemetry", "src\Arcturus.EventBus.OpenTelemetry\Arcturus.EventBus.OpenTelemetry.csproj", "{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -33,6 +41,22 @@ Global {2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Debug|Any CPU.Build.0 = Debug|Any CPU {2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Release|Any CPU.ActiveCfg = Release|Any CPU {2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Release|Any CPU.Build.0 = Release|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Release|Any CPU.Build.0 = Release|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Release|Any CPU.Build.0 = Release|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.Build.0 = Debug|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.ActiveCfg = Release|Any CPU + {86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.Build.0 = Release|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/docs/README.md b/docs/README.md index a31b271..0b9af12 100644 --- a/docs/README.md +++ b/docs/README.md @@ -10,6 +10,9 @@ Arcturus is a ready-to-use framework designed for building modern cloud and dist | Arcturus.ResultObjects | Result objects for not relying on exceptions as code flow. | Arcturus.ResultObjects | | Arcturus.Extensions.ResultObjects.AspNetCore | Extensions for using Result objects in ASP.NET Core. | Arcturus.Extensions.ResultObjects.AspNetCore | | Arcturus.AspNetCore.Endpoints | Enable quick and easy to use API endpoints. | Arcturus.AspNetCore.Endpoints | +| Arcturus.EventBus | Event bus for publish/subscribe pattern. | Arcturus.EventBus | +| Arcturus.EventBus.RabbitMQ | RabbitMQ implementation for Arcturus.EventBus. | Arcturus.EventBus.RabbitMQ | +| Arcturus.EventBus.Abstracts | Abstracts for EventBus implementation | Arcturus.EventBus.Abstracts | ## Installation Using .NET CLI diff --git a/src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj b/src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj new file mode 100644 index 0000000..a84b2ef --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/Arcturus.EventBus.Abstracts.csproj @@ -0,0 +1,9 @@ + + + + Arcturus.EventBus.Abstracts + MIT + eventbus,events,arcturus + + + diff --git a/src/Arcturus.EventBus.Abstracts/IConnection.cs b/src/Arcturus.EventBus.Abstracts/IConnection.cs new file mode 100644 index 0000000..5c5e7e1 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IConnection.cs @@ -0,0 +1,13 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IConnection +{ + /// + /// Gets the name of the application. + /// + string? ApplicationId { get; } + /// + /// Gets a value indicating if the connection is connected. + /// + bool IsConnected { get; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs b/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs new file mode 100644 index 0000000..bb854ed --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs @@ -0,0 +1,8 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IEventBusFactory +{ + IProcessor CreateProcessor(string? queue = null); + IPublisher CreatePublisher(string? queue = null); + ISubscriber CreateSubscriber(); +} diff --git a/src/Arcturus.EventBus.Abstracts/IEventMessage.cs b/src/Arcturus.EventBus.Abstracts/IEventMessage.cs new file mode 100644 index 0000000..170edb0 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IEventMessage.cs @@ -0,0 +1,3 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IEventMessage { } \ No newline at end of file diff --git a/src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs b/src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs new file mode 100644 index 0000000..3aa3c11 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs @@ -0,0 +1,6 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IEventMessageHandler where TEvent : IEventMessage +{ + Task Handle(TEvent @event, CancellationToken cancellationToken = default); +} diff --git a/src/Arcturus.EventBus.Abstracts/IProcessor.cs b/src/Arcturus.EventBus.Abstracts/IProcessor.cs new file mode 100644 index 0000000..b4ee196 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IProcessor.cs @@ -0,0 +1,17 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IProcessor +{ + /// + /// Event triggered when a message is processed asynchronously. + /// + event Func OnProcessAsync; + + /// + /// Waits for events to be processed. + /// + /// Token to monitor for cancellation requests. + /// A task that represents the asynchronous operation. + Task WaitForEvents( + CancellationToken cancellationToken = default); +} diff --git a/src/Arcturus.EventBus.Abstracts/IPublisher.cs b/src/Arcturus.EventBus.Abstracts/IPublisher.cs new file mode 100644 index 0000000..23a2c76 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/IPublisher.cs @@ -0,0 +1,15 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface IPublisher +{ + /// + /// + /// + /// + /// + /// + /// + Task Publish( + TEvent @event + , CancellationToken cancellationToken = default) where TEvent : IEventMessage; +} diff --git a/src/Arcturus.EventBus.Abstracts/ISubscriber.cs b/src/Arcturus.EventBus.Abstracts/ISubscriber.cs new file mode 100644 index 0000000..6c7e3c9 --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/ISubscriber.cs @@ -0,0 +1,8 @@ +namespace Arcturus.EventBus.Abstracts; + +public interface ISubscriber +{ + Task Subscribe( + Func handler + , CancellationToken cancellationToken = default) where TEvent : IEventMessage; +} diff --git a/src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs b/src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs new file mode 100644 index 0000000..debffed --- /dev/null +++ b/src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs @@ -0,0 +1,26 @@ +namespace Arcturus.EventBus.Abstracts; + +public sealed class OnProcessEventArgs : EventArgs +{ + public OnProcessEventArgs( + string? messageId + , IDictionary? headers + , CancellationToken cancellationToken = default) + { + MessageId = messageId; + Headers = headers; + CancellationToken = cancellationToken; + } + /// + /// Gets a dictionary of headers or null. + /// + public IDictionary? Headers { get; } + /// + /// Gets an id of the message or null. + /// + public string? MessageId { get; } + /// + /// Gets a cancellation token or null. + /// + public CancellationToken? CancellationToken { get; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj b/src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj new file mode 100644 index 0000000..d06a8d0 --- /dev/null +++ b/src/Arcturus.EventBus.OpenTelemetry/Arcturus.EventBus.OpenTelemetry.csproj @@ -0,0 +1,17 @@ + + + + Arcturus.EventBus.OpenTelemetry + MIT + eventbus,events,arcturus,opentelemetry + + + + + + + + + + + diff --git a/src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs b/src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs new file mode 100644 index 0000000..5865ffb --- /dev/null +++ b/src/Arcturus.EventBus.OpenTelemetry/TraceProviderBuilderExtensions.cs @@ -0,0 +1,57 @@ +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; +using OpenTelemetry; +using System.Diagnostics; +using System.Text; +using Arcturus.EventBus.Diagnostics; + +namespace Arcturus.EventBus.OpenTelemetry; + +public static class TraceProviderBuilderExtensions +{ + public static TracerProviderBuilder AddEventBusInstrumentation(this TracerProviderBuilder builder) + { + EventBusActivitySource.ContextExtractor = OpenTelemetryContextExtractor; + EventBusActivitySource.ContextInjector = OpenTelemetryContextInjector; + builder.AddSource(EventBusActivitySource.ActivityName); + return builder; + } + + private static ActivityContext OpenTelemetryContextExtractor(IDiagnosticProperties props) + { + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + Baggage.Current = parentContext.Baggage; + return parentContext.ActivityContext; + } + + private static IEnumerable OpenTelemetryContextGetter(IDictionary? carrier, string key) + { + try + { + if (carrier is not null && carrier.TryGetValue(key, out object? value) && value is byte[] bytes) + { + return [Encoding.UTF8.GetString(bytes)]; + } + } + catch (Exception) + { + //this.logger.LogError(ex, "Failed to extract trace context."); + } + + return []; + } + + private static void OpenTelemetryContextInjector(Activity activity, IDictionary? props) + { + // Inject the current Activity's context into the message headers. + Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); + } + + private static void OpenTelemetryContextSetter(IDictionary? carrier, string key, string value) + { + if (carrier is null) + return; + carrier[key] = Encoding.UTF8.GetBytes(value); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj b/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj new file mode 100644 index 0000000..dccf0b6 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj @@ -0,0 +1,21 @@ + + + + Arcturus.EventBus.RabbitMQ + MIT + eventbus,events,arcturus,rabbitmq + + + + + + + + + + + + + + + diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs new file mode 100644 index 0000000..cf04e0e --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/DefaultEventMessageTypeResolver.cs @@ -0,0 +1,34 @@ +namespace Arcturus.EventBus.RabbitMQ.Internals; + +/// +/// Default type resolver using the AppDomain. +/// +internal class DefaultEventMessageTypeResolver +{ + private readonly Dictionary _reflectionCache = []; + + /// + /// Resolves to a . + /// + /// Required. Name of type to resolve. + /// or null. + internal Type? ResolveType(string typeName) + { + ArgumentNullException.ThrowIfNull(typeName, nameof(typeName)); + + if (_reflectionCache.TryGetValue(typeName, out var type)) + return type; + + type = AppDomain.CurrentDomain + .GetAssemblies() + .Select(a => a.GetType(typeName)) + .Where(_ => _ is not null) + .FirstOrDefault(); + if (type is not null) + { + _reflectionCache.Add(typeName, type); + return type; + } + return null; + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs new file mode 100644 index 0000000..53f760e --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs @@ -0,0 +1,67 @@ +using Arcturus.EventBus.Abstracts; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Arcturus.EventBus.RabbitMQ.Internals; + +internal sealed class EventMessageConverter : JsonConverter +{ + private const string DiscriminatorPropertyName = "$eventType"; + private readonly DefaultEventMessageTypeResolver _typeResolver; + + internal EventMessageConverter() + { + _typeResolver = new DefaultEventMessageTypeResolver(); + } + + public override IEventMessage Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + using var document = JsonDocument.ParseValue(ref reader); + var root = document.RootElement; + + // Get the discriminator + if (!root.TryGetProperty(DiscriminatorPropertyName, out var typeProperty)) + { + throw new JsonException($"Missing discriminator property '{DiscriminatorPropertyName}'."); + } + + var typeName = typeProperty.GetString() ?? throw new JsonException("Discriminator property value is null."); + var type = _typeResolver.ResolveType(typeName) ?? throw new JsonException($"Cannot resolve type '{typeName}'."); + var json = root.GetRawText(); + return (IEventMessage)JsonSerializer.Deserialize(json, type, options)!; + } + + public override void Write(Utf8JsonWriter writer, IEventMessage value, JsonSerializerOptions options) + { + if (value == null) + { + writer.WriteNullValue(); + return; + }// Get the actual type of the object + var type = value.GetType(); + writer.WriteStartObject(); + + // Write the discriminator + writer.WriteString(DiscriminatorPropertyName, type.FullName); + + // Serialize the object properties + var properties = type.GetProperties(); + foreach (var property in properties) + { + var propertyValue = property.GetValue(value); + if (propertyValue != null) + { + writer.WritePropertyName(property.Name); + JsonSerializer.Serialize(writer, propertyValue, propertyValue?.GetType() ?? typeof(object), options); + } + else if (propertyValue == null && + options.DefaultIgnoreCondition != JsonIgnoreCondition.WhenWritingNull) + { + writer.WritePropertyName(property.Name); + writer.WriteNullValue(); + } + } + + writer.WriteEndObject(); + } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs new file mode 100644 index 0000000..fd06154 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs @@ -0,0 +1,19 @@ +using Arcturus.EventBus.Abstracts; +using System.Text.Json; + +namespace Arcturus.EventBus.RabbitMQ.Internals; + +internal static class EventMessageSerializer +{ + private static JsonSerializerOptions _options = new() + { + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull + , WriteIndented = false + , Converters = { new EventMessageConverter() } + }; + + internal static string Serialize(IEventMessage @event) + => JsonSerializer.Serialize(@event, _options); + internal static IEventMessage Deserialize(string message) + => JsonSerializer.Deserialize(message, _options)!; +} diff --git a/src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs b/src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs new file mode 100644 index 0000000..c9c5cb4 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/OpenTelemetry/OpenTelemetryExtensions.cs @@ -0,0 +1,56 @@ +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; +using RabbitMQ.Client; +using System.Text; + +namespace Arcturus.EventBus.RabbitMQ.OpenTelemetry; + +public static class OpenTelemetryExtensions +{ + public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder) + { + RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor; + RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector; + builder.AddSource("RabbitMQ.Client.*"); + return builder; + } + + private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props) + { + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + Baggage.Current = parentContext.Baggage; + return parentContext.ActivityContext; + } + + private static IEnumerable OpenTelemetryContextGetter(IDictionary? carrier, string key) + { + try + { + if (carrier is not null && carrier.TryGetValue(key, out object? value) && value is byte[] bytes) + { + return [Encoding.UTF8.GetString(bytes)]; + } + } + catch (Exception) + { + //this.logger.LogError(ex, "Failed to extract trace context."); + } + + return []; + } + + private static void OpenTelemetryContextInjector(Activity activity, IDictionary? props) + { + // Inject the current Activity's context into the message headers. + Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); + } + + private static void OpenTelemetryContextSetter(IDictionary? carrier, string key, string value) + { + if (carrier is null) + return; + carrier[key] = Encoding.UTF8.GetBytes(value); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs new file mode 100644 index 0000000..18f6a11 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQConnection.cs @@ -0,0 +1,57 @@ +using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.Threading; +using RMQ = RabbitMQ.Client; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQConnection : IConnection +{ + private string? _applicationId { get; } + private string _clientName { get; } + private string _connectionHostName { get; } + private RMQ.IChannel? _channel; + private RMQ.IConnection? _connection; + private bool _isConnected = false; + public string? ApplicationId => _applicationId; + public bool IsConnected => _isConnected; + + private readonly AsyncLock _asyncLock = new(); + + internal RMQ.IChannel Channel => _channel!; + + internal RabbitMQConnection(RabbitMQEventBusOptions options) + { + _applicationId = options.ApplicationId; + _clientName = options.ClientName ?? Environment.MachineName; + _connectionHostName = options.HostName ?? "localhost"; + } + + internal async Task Connect(CancellationToken cancellationToken = default) + { + var factory = new RMQ.ConnectionFactory { HostName = _connectionHostName }; + + using (await _asyncLock.LockAsync(cancellationToken)) + { + _connection = await factory.CreateConnectionAsync(_clientName, cancellationToken); + _connection.ConnectionShutdownAsync += (sender, args) => + { + _isConnected = false; + return Task.CompletedTask; + }; + _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + + _isConnected = true; + } + } + internal Task EnsureConnected(CancellationToken cancellationToken = default) + { + if (!IsConnected) + return Connect(cancellationToken); + return Task.CompletedTask; + } + public void Dispose() + { + _channel?.Dispose(); + _connection?.Dispose(); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs new file mode 100644 index 0000000..d1e1c4b --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusFactory.cs @@ -0,0 +1,20 @@ +using Arcturus.EventBus.Abstracts; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQEventBusFactory( + IConnection connection) : IEventBusFactory +{ + public IPublisher CreatePublisher(string? queue = null) + { + return new RabbitMQPublisher(connection, queue); + } + public IProcessor CreateProcessor(string? queue = null) + { + return new RabbitMQProcessor(connection, queue); + } + public ISubscriber CreateSubscriber() + { + throw new NotImplementedException(); + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs new file mode 100644 index 0000000..c119c7b --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs @@ -0,0 +1,17 @@ +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQEventBusOptions +{ + /// + /// Gets or sets an application id. + /// + public string? ApplicationId { get; set; } + /// + /// Gets or sets a client name. + /// + public string? ClientName { get; set; } + /// + /// Gets or sets a host name. Defaults to localhost. + /// + public string? HostName { get; set; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs new file mode 100644 index 0000000..53a7143 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQProcessor.cs @@ -0,0 +1,66 @@ +using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.RabbitMQ.Internals; +using RabbitMQ.Client.Events; +using RabbitMQ.Client; +using System.Text; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQProcessor : IProcessor +{ + public event Func? OnProcessAsync; + private readonly RabbitMQConnection _connection; + private readonly string _queueName; + + internal RabbitMQProcessor(Abstracts.IConnection connection, string? queueName = null) + { + if (connection is not RabbitMQConnection) + throw new NotImplementedException(); + + _connection = (RabbitMQConnection)connection; + _queueName = queueName ?? "default_queue"; + } + + public async Task WaitForEvents(CancellationToken cancellationToken = default) + { + await _connection.EnsureConnected(cancellationToken); + + // consumer + await _connection.Channel.QueueDeclareAsync( + queue: _queueName + , durable: true + , exclusive: false + , autoDelete: false + , arguments: null + , cancellationToken: cancellationToken); + + await _connection.Channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken); + // Console.WriteLine(" [*] Waiting for messages."); + + var consumer = new AsyncEventingBasicConsumer(_connection.Channel); + consumer.ReceivedAsync += async (model, ea) => + { + byte[] body = ea.Body.ToArray(); + var message = Encoding.UTF8.GetString(body); + + var @event = EventMessageSerializer.Deserialize(message); + + if (OnProcessAsync is not null) + await OnProcessAsync.Invoke( + @event + , new OnProcessEventArgs( + ea.BasicProperties.MessageId + , ea.BasicProperties.Headers + , ea.CancellationToken)); + + await _connection.Channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false, cancellationToken); + }; + + await _connection.Channel.BasicConsumeAsync(_queueName, autoAck: false, consumer: consumer, cancellationToken); + // await _connection.Channel.BasicConsumeAsync(queueName, false, null, false, false, null, consumer, cancellationToken); + while (!cancellationToken.IsCancellationRequested) + { + cancellationToken.WaitHandle.WaitOne(100); + } + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs new file mode 100644 index 0000000..15ce8fd --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/RabbitMQPublisher.cs @@ -0,0 +1,66 @@ +using Arcturus.EventBus.Abstracts; +using Arcturus.EventBus.Diagnostics; +using Arcturus.EventBus.RabbitMQ.Internals; +using Polly; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using System.Net.Sockets; +using System.Text; + +namespace Arcturus.EventBus.RabbitMQ; + +public sealed class RabbitMQPublisher : IPublisher +{ + private readonly RabbitMQConnection _connection; + private readonly string _queueName; + + internal RabbitMQPublisher(Abstracts.IConnection connection, string? queueName = null) + { + if (connection is not RabbitMQConnection) + throw new NotImplementedException($"Requires RabbitMQConnection"); + + _connection = (RabbitMQConnection)connection; + _queueName = queueName ?? "default_queue"; + } + + public async Task Publish( + TEvent @event + , CancellationToken cancellationToken = default) where TEvent : IEventMessage + { + // retry policy + var policy = Policy + .Handle() + .Or() + .Or() + .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (exception, timeSpan) => { + // _logger.LogWarning(exception, "Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}.", @event.Id, $"{timeSpan.TotalSeconds:n1}", exception.Message); + }); + + await policy.Execute(async () => { + await _connection.EnsureConnected(cancellationToken); + + await _connection.Channel.QueueDeclareAsync( + queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null, cancellationToken: cancellationToken); + + var message = EventMessageSerializer.Serialize(@event); + var body = Encoding.UTF8.GetBytes(message); + + using Activity? sendActivity = EventBusActivitySource.PublisherHasListeners + ? EventBusActivitySource.Publish(@event.GetType().Name) + : default; + //sendActivity?.SetTag("eventbus.message.name", @event.GetType().Name); + + var properties = new BasicProperties + { + Persistent = true + , AppId = _connection.ApplicationId + , CorrelationId = null + , Headers = null + , MessageId = Guid.NewGuid().ToString() + }; + + await _connection.Channel.BasicPublishAsync( + exchange: string.Empty, routingKey: _queueName, mandatory: true, basicProperties: properties, body: body); + }); + } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs b/src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs new file mode 100644 index 0000000..6b33149 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/ServiceExtensions.cs @@ -0,0 +1,24 @@ +using Arcturus.EventBus.Abstracts; +using Microsoft.Extensions.DependencyInjection; + +namespace Arcturus.EventBus.RabbitMQ; + +public static class ServicesExtensions +{ + public static IServiceCollection AddRabbitMQEventBus( + this IServiceCollection services + , Action? options = null) + { + var currentOptions = new RabbitMQEventBusOptions(); + if (options is not null) + { + options(currentOptions); + } + + services.AddSingleton( + (sp) => { return new RabbitMQConnection(currentOptions); }); + services.AddSingleton(); + + return services; + } +} diff --git a/src/Arcturus.EventBus.RabbitMQ/Usings.cs b/src/Arcturus.EventBus.RabbitMQ/Usings.cs new file mode 100644 index 0000000..a335706 --- /dev/null +++ b/src/Arcturus.EventBus.RabbitMQ/Usings.cs @@ -0,0 +1,2 @@ +global using System.Diagnostics; +global using System.Reflection; diff --git a/src/Arcturus.EventBus/Arcturus.EventBus.csproj b/src/Arcturus.EventBus/Arcturus.EventBus.csproj new file mode 100644 index 0000000..5adda9b --- /dev/null +++ b/src/Arcturus.EventBus/Arcturus.EventBus.csproj @@ -0,0 +1,18 @@ + + + + Arcturus.EventBus + MIT + eventbus,events,arcturus + + + + + + + + + + + + diff --git a/src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs b/src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs new file mode 100644 index 0000000..c9919b1 --- /dev/null +++ b/src/Arcturus.EventBus/Diagnostics/EventBusActivitySource.cs @@ -0,0 +1,153 @@ +namespace Arcturus.EventBus.Diagnostics; + +public static class EventBusActivitySource +{ + private static readonly string AssemblyVersion = typeof(EventBusActivitySource).Assembly + .GetCustomAttribute() + ?.InformationalVersion ?? ""; + private static readonly ActivitySource _publisherSource = new (PublisherSourceName, AssemblyVersion); + private static readonly ActivitySource _subscriberSource = new (SubscriberSourceName, AssemblyVersion); + + internal const string MessageId = "eventbus.message.id"; + internal const string MessageConversationId = "eventbus.message.conversation_id"; + internal const string MessagingSystem = "EventBus"; + internal const string MessagingOperationTypePublish = "publish"; + internal const string MessagingOperationTypeProcess = "process"; + internal const string MessagingOperationTypeReceive = "receive"; + internal const string MessagingOperationType = "eventbus.message.operation"; + + public static bool PublisherHasListeners => _publisherSource.HasListeners(); + + internal static readonly IEnumerable> CreationTags = + [ + new KeyValuePair(MessagingSystem, "rabbitmq") + ]; + + public static Activity? Publish(string eventMessageName, ActivityContext linkedContext = default) + { + if (!_publisherSource.HasListeners()) + { + return null; + } + Activity? activity = linkedContext == default + ? _publisherSource.StartEventBusActivity( + MessagingOperationTypePublish, // UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, + ActivityKind.Producer) + : _publisherSource.StartLinkedEventBusActivity( + MessagingOperationTypePublish, // UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, + ActivityKind.Producer, linkedContext); + if (activity != null && activity.IsAllDataRequested) + { + PopulateMessagingTags(MessagingOperationTypePublish, activity); + } + + return activity; + + } + + private static Activity? StartEventBusActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext parentContext = default) + { + return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start(); + } + + private static Activity? StartLinkedEventBusActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext linkedContext = default, ActivityContext parentContext = default) + { + return source.CreateActivity(name, kind, parentContext: parentContext, + links: new[] { new ActivityLink(linkedContext) }, idFormat: ActivityIdFormat.W3C, + tags: CreationTags) + ?.Start(); + } + + public const string PublisherSourceName = "Arcturus.EventBus.Publisher"; + public const string SubscriberSourceName = "Arcturus.EventBus.Subscriber"; + public const string ActivityName = "Arcturus.EventBus.*"; + + public static Action> ContextInjector { get; set; } = DefaultContextInjector; + public static Func ContextExtractor { get; set; } = DefaultContextExtractor; + + //private static void PopulateMessagingTags(string operation, Activity activity) + //{ + // PopulateMessagingTags(operation, activity); + + // if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId)) + // { + // activity.SetTag(MessageConversationId, readOnlyBasicProperties.CorrelationId); + // } + + // if (!string.IsNullOrEmpty(readOnlyBasicProperties.MessageId)) + // { + // activity.SetTag(MessageId, readOnlyBasicProperties.MessageId); + // } + //} + private static void PopulateMessagingTags(string operation, Activity activity) + { + activity + .SetTag(MessagingOperationType, operation); + //.SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange) + //.SetTag(MessagingDestinationRoutingKey, routingKey) + //.SetTag(MessagingBodySize, bodySize); + + //if (deliveryTag > 0) + //{ + // activity.SetTag(RabbitMQDeliveryTag, deliveryTag); + //} + } + + private static void DefaultContextInjector(Activity sendActivity, IDictionary props) + { + DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter); + } + private static ActivityContext DefaultContextExtractor(IDiagnosticProperties props) + { + if (props.Headers == null) + { + return default; + } + + bool hasHeaders = false; + foreach (string header in DistributedContextPropagator.Current.Fields) + { + if (props.Headers.ContainsKey(header)) + { + hasHeaders = true; + break; + } + } + + + if (!hasHeaders) + { + return default; + } + + DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string? traceParent, out string? traceState); + return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default; + } + + private static void DefaultContextSetter(object? carrier, string name, string value) + { + if (!(carrier is IDictionary carrierDictionary)) + { + return; + } + + // Only propagate headers if they haven't already been set + carrierDictionary[name] = value; + } + private static void DefaultContextGetter(object? carrier, string name, out string? value, out IEnumerable? values) + { + if (carrier is IDictionary carrierDict && + carrierDict.TryGetValue(name, out object? propsVal) && propsVal is byte[] bytes) + { + value = Encoding.UTF8.GetString(bytes); + values = default; + } + else + { + value = default; + values = default; + } + } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs b/src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs new file mode 100644 index 0000000..a4618d0 --- /dev/null +++ b/src/Arcturus.EventBus/Diagnostics/IDiagnosticProperties.cs @@ -0,0 +1,9 @@ +namespace Arcturus.EventBus.Diagnostics; + +public interface IDiagnosticProperties +{ + string? AppId { get; } + string? MessageId { get; } + string? CorrelationId { get; } + IDictionary? Headers { get; } +} \ No newline at end of file diff --git a/src/Arcturus.EventBus/Threading/AsyncLock.cs b/src/Arcturus.EventBus/Threading/AsyncLock.cs new file mode 100644 index 0000000..3bb9e26 --- /dev/null +++ b/src/Arcturus.EventBus/Threading/AsyncLock.cs @@ -0,0 +1,27 @@ +namespace Arcturus.EventBus.Threading; + +public sealed class AsyncLock +{ + private readonly SemaphoreSlim _semaphore = new(1, 1); + + public async Task LockAsync(CancellationToken cancellationToken = default) + { + await _semaphore.WaitAsync(cancellationToken); + return new Releaser(_semaphore); + } + + private sealed class Releaser : IDisposable + { + private readonly SemaphoreSlim _semaphore; + + internal Releaser(SemaphoreSlim semaphore) + { + _semaphore = semaphore; + } + + public void Dispose() + { + _semaphore.Release(); + } + } +} diff --git a/src/Arcturus.EventBus/Usings.cs b/src/Arcturus.EventBus/Usings.cs new file mode 100644 index 0000000..51519b3 --- /dev/null +++ b/src/Arcturus.EventBus/Usings.cs @@ -0,0 +1,3 @@ +global using System.Diagnostics; +global using System.Reflection; +global using System.Text; \ No newline at end of file