Skip to content

Commit

Permalink
Merge pull request #25 from cloudfy/24-arcturuseventbus
Browse files Browse the repository at this point in the history
24 arcturuseventbus
  • Loading branch information
cloudfy authored Dec 3, 2024
2 parents c0e9014 + c6affb8 commit 8796bcd
Show file tree
Hide file tree
Showing 30 changed files with 865 additions and 0 deletions.
24 changes: 24 additions & 0 deletions Arcturus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arcturus.Extensions.ResultO
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.AspNetCore.Endpoints", "src\Arcturus.AspNetCore.Endpoints\Arcturus.AspNetCore.Endpoints.csproj", "{2CF44FC8-346B-44A9-9B80-2D7A16E2B474}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.Abstracts", "src\Arcturus.EventBus.Abstracts\Arcturus.EventBus.Abstracts.csproj", "{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus", "src\Arcturus.EventBus\Arcturus.EventBus.csproj", "{ED23F692-87DC-4753-A67B-D6F94F3BBA00}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.RabbitMQ", "src\Arcturus.EventBus.RabbitMQ\Arcturus.EventBus.RabbitMQ.csproj", "{86F28A25-A376-411F-B67B-FB6FC3742566}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcturus.EventBus.OpenTelemetry", "src\Arcturus.EventBus.OpenTelemetry\Arcturus.EventBus.OpenTelemetry.csproj", "{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -33,6 +41,22 @@ Global
{2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2CF44FC8-346B-44A9-9B80-2D7A16E2B474}.Release|Any CPU.Build.0 = Release|Any CPU
{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{491AEBF9-2D1C-4E79-928E-B15EEAD1533A}.Release|Any CPU.Build.0 = Release|Any CPU
{ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Release|Any CPU.ActiveCfg = Release|Any CPU
{ED23F692-87DC-4753-A67B-D6F94F3BBA00}.Release|Any CPU.Build.0 = Release|Any CPU
{86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{86F28A25-A376-411F-B67B-FB6FC3742566}.Debug|Any CPU.Build.0 = Debug|Any CPU
{86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.ActiveCfg = Release|Any CPU
{86F28A25-A376-411F-B67B-FB6FC3742566}.Release|Any CPU.Build.0 = Release|Any CPU
{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DEFC8EF4-E8BD-48F5-99F3-C9D389A1D4AE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
3 changes: 3 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Arcturus is a ready-to-use framework designed for building modern cloud and dist
| Arcturus.ResultObjects | Result objects for not relying on exceptions as code flow. | Arcturus.ResultObjects |
| Arcturus.Extensions.ResultObjects.AspNetCore | Extensions for using Result objects in ASP.NET Core. | Arcturus.Extensions.ResultObjects.AspNetCore |
| Arcturus.AspNetCore.Endpoints | Enable quick and easy to use API endpoints. | Arcturus.AspNetCore.Endpoints |
| Arcturus.EventBus | Event bus for publish/subscribe pattern. | Arcturus.EventBus |
| Arcturus.EventBus.RabbitMQ | RabbitMQ implementation for Arcturus.EventBus. | Arcturus.EventBus.RabbitMQ |
| Arcturus.EventBus.Abstracts | Abstracts for EventBus implementation | Arcturus.EventBus.Abstracts |

## Installation
Using .NET CLI
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Arcturus.EventBus.Abstracts</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>eventbus,events,arcturus</PackageTags>
</PropertyGroup>

</Project>
13 changes: 13 additions & 0 deletions src/Arcturus.EventBus.Abstracts/IConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Arcturus.EventBus.Abstracts;

public interface IConnection
{
/// <summary>
/// Gets the name of the application.
/// </summary>
string? ApplicationId { get; }
/// <summary>
/// Gets a value indicating if the connection is connected.
/// </summary>
bool IsConnected { get; }
}
8 changes: 8 additions & 0 deletions src/Arcturus.EventBus.Abstracts/IEventBusFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Arcturus.EventBus.Abstracts;

public interface IEventBusFactory
{
IProcessor CreateProcessor(string? queue = null);
IPublisher CreatePublisher(string? queue = null);
ISubscriber CreateSubscriber();
}
3 changes: 3 additions & 0 deletions src/Arcturus.EventBus.Abstracts/IEventMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Arcturus.EventBus.Abstracts;

public interface IEventMessage { }
6 changes: 6 additions & 0 deletions src/Arcturus.EventBus.Abstracts/IEventMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Arcturus.EventBus.Abstracts;

public interface IEventMessageHandler<in TEvent> where TEvent : IEventMessage
{
Task Handle(TEvent @event, CancellationToken cancellationToken = default);
}
17 changes: 17 additions & 0 deletions src/Arcturus.EventBus.Abstracts/IProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Arcturus.EventBus.Abstracts;

public interface IProcessor
{
/// <summary>
/// Event triggered when a message is processed asynchronously.
/// </summary>
event Func<IEventMessage, OnProcessEventArgs?, Task> OnProcessAsync;

/// <summary>
/// Waits for events to be processed.
/// </summary>
/// <param name="cancellationToken">Token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
Task WaitForEvents(
CancellationToken cancellationToken = default);
}
15 changes: 15 additions & 0 deletions src/Arcturus.EventBus.Abstracts/IPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Arcturus.EventBus.Abstracts;

public interface IPublisher
{
/// <summary>
///
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <param name="event"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task Publish<TEvent>(
TEvent @event
, CancellationToken cancellationToken = default) where TEvent : IEventMessage;
}
8 changes: 8 additions & 0 deletions src/Arcturus.EventBus.Abstracts/ISubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Arcturus.EventBus.Abstracts;

public interface ISubscriber
{
Task Subscribe<TEvent>(
Func<TEvent, Task> handler
, CancellationToken cancellationToken = default) where TEvent : IEventMessage;
}
26 changes: 26 additions & 0 deletions src/Arcturus.EventBus.Abstracts/OnProcessEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace Arcturus.EventBus.Abstracts;

public sealed class OnProcessEventArgs : EventArgs
{
public OnProcessEventArgs(
string? messageId
, IDictionary<string, object?>? headers
, CancellationToken cancellationToken = default)
{
MessageId = messageId;
Headers = headers;
CancellationToken = cancellationToken;
}
/// <summary>
/// Gets a dictionary of headers or null.
/// </summary>
public IDictionary<string, object?>? Headers { get; }
/// <summary>
/// Gets an id of the message or null.
/// </summary>
public string? MessageId { get; }
/// <summary>
/// Gets a cancellation token or null.
/// </summary>
public CancellationToken? CancellationToken { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Arcturus.EventBus.OpenTelemetry</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>eventbus,events,arcturus,opentelemetry</PackageTags>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" Version="1.10.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Arcturus.EventBus\Arcturus.EventBus.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
using OpenTelemetry;
using System.Diagnostics;
using System.Text;
using Arcturus.EventBus.Diagnostics;

namespace Arcturus.EventBus.OpenTelemetry;

public static class TraceProviderBuilderExtensions
{
public static TracerProviderBuilder AddEventBusInstrumentation(this TracerProviderBuilder builder)
{
EventBusActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
EventBusActivitySource.ContextInjector = OpenTelemetryContextInjector;
builder.AddSource(EventBusActivitySource.ActivityName);
return builder;
}

private static ActivityContext OpenTelemetryContextExtractor(IDiagnosticProperties props)
{
// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
Baggage.Current = parentContext.Baggage;
return parentContext.ActivityContext;
}

private static IEnumerable<string> OpenTelemetryContextGetter(IDictionary<string, object?>? carrier, string key)
{
try
{
if (carrier is not null && carrier.TryGetValue(key, out object? value) && value is byte[] bytes)
{
return [Encoding.UTF8.GetString(bytes)];
}
}
catch (Exception)
{
//this.logger.LogError(ex, "Failed to extract trace context.");
}

return [];
}

private static void OpenTelemetryContextInjector(Activity activity, IDictionary<string, object?>? props)
{
// Inject the current Activity's context into the message headers.
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
}

private static void OpenTelemetryContextSetter(IDictionary<string, object?>? carrier, string key, string value)
{
if (carrier is null)
return;
carrier[key] = Encoding.UTF8.GetBytes(value);
}
}
21 changes: 21 additions & 0 deletions src/Arcturus.EventBus.RabbitMQ/Arcturus.EventBus.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Arcturus.EventBus.RabbitMQ</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>eventbus,events,arcturus,rabbitmq</PackageTags>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="Polly" Version="8.5.0" />
<PackageReference Include="OpenTelemetry.Api" Version="1.10.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Arcturus.EventBus\Arcturus.EventBus.csproj" />
</ItemGroup>


</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Arcturus.EventBus.RabbitMQ.Internals;

/// <summary>
/// Default type resolver using the AppDomain.
/// </summary>
internal class DefaultEventMessageTypeResolver
{
private readonly Dictionary<string, Type?> _reflectionCache = [];

/// <summary>
/// Resolves <paramref name="typeName"/> to a <see cref="Type"/>.
/// </summary>
/// <param name="typeName">Required. Name of type to resolve.</param>
/// <returns><see cref="Type"/> or null.</returns>
internal Type? ResolveType(string typeName)
{
ArgumentNullException.ThrowIfNull(typeName, nameof(typeName));

if (_reflectionCache.TryGetValue(typeName, out var type))
return type;

type = AppDomain.CurrentDomain
.GetAssemblies()
.Select(a => a.GetType(typeName))
.Where(_ => _ is not null)
.FirstOrDefault();
if (type is not null)
{
_reflectionCache.Add(typeName, type);
return type;
}
return null;
}
}
67 changes: 67 additions & 0 deletions src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using Arcturus.EventBus.Abstracts;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Arcturus.EventBus.RabbitMQ.Internals;

internal sealed class EventMessageConverter : JsonConverter<IEventMessage>
{
private const string DiscriminatorPropertyName = "$eventType";
private readonly DefaultEventMessageTypeResolver _typeResolver;

internal EventMessageConverter()
{
_typeResolver = new DefaultEventMessageTypeResolver();
}

public override IEventMessage Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
using var document = JsonDocument.ParseValue(ref reader);
var root = document.RootElement;

// Get the discriminator
if (!root.TryGetProperty(DiscriminatorPropertyName, out var typeProperty))
{
throw new JsonException($"Missing discriminator property '{DiscriminatorPropertyName}'.");
}

var typeName = typeProperty.GetString() ?? throw new JsonException("Discriminator property value is null.");
var type = _typeResolver.ResolveType(typeName) ?? throw new JsonException($"Cannot resolve type '{typeName}'.");
var json = root.GetRawText();
return (IEventMessage)JsonSerializer.Deserialize(json, type, options)!;
}

public override void Write(Utf8JsonWriter writer, IEventMessage value, JsonSerializerOptions options)
{
if (value == null)
{
writer.WriteNullValue();
return;
}// Get the actual type of the object
var type = value.GetType();
writer.WriteStartObject();

// Write the discriminator
writer.WriteString(DiscriminatorPropertyName, type.FullName);

// Serialize the object properties
var properties = type.GetProperties();
foreach (var property in properties)
{
var propertyValue = property.GetValue(value);
if (propertyValue != null)
{
writer.WritePropertyName(property.Name);
JsonSerializer.Serialize(writer, propertyValue, propertyValue?.GetType() ?? typeof(object), options);
}
else if (propertyValue == null &&
options.DefaultIgnoreCondition != JsonIgnoreCondition.WhenWritingNull)
{
writer.WritePropertyName(property.Name);
writer.WriteNullValue();
}
}

writer.WriteEndObject();
}
}
19 changes: 19 additions & 0 deletions src/Arcturus.EventBus.RabbitMQ/Internals/EventMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Arcturus.EventBus.Abstracts;
using System.Text.Json;

namespace Arcturus.EventBus.RabbitMQ.Internals;

internal static class EventMessageSerializer
{
private static JsonSerializerOptions _options = new()
{
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
, WriteIndented = false
, Converters = { new EventMessageConverter() }
};

internal static string Serialize(IEventMessage @event)
=> JsonSerializer.Serialize<IEventMessage>(@event, _options);
internal static IEventMessage Deserialize(string message)
=> JsonSerializer.Deserialize<IEventMessage>(message, _options)!;
}
Loading

0 comments on commit 8796bcd

Please sign in to comment.