Skip to content

Commit

Permalink
feat: global events and open telemetry instrumentation (#451)
Browse files Browse the repository at this point in the history
Include events for message produce and consume:
- MessageConsumeCompleted
- MessageConsumeError
- MessageConsumeStarted
- MessageProduceCompleted
- MessageProduceError
- MessageProduceStarted

Include OpenTelemetry support for traces and baggage signals.
  • Loading branch information
simaoribeiro authored Oct 19, 2023
1 parent 3912651 commit 0926b94
Show file tree
Hide file tree
Showing 33 changed files with 1,378 additions and 25 deletions.
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
Expand Down
38 changes: 38 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace KafkaFlow.Configuration
{
/// <summary>
/// Provides access to events fired by the internals of the library
/// </summary>
public interface IGlobalEvents
{
/// <summary>
/// Gets the message consume completed event
/// </summary>
IEvent<MessageEventContext> MessageConsumeCompleted { get; }

/// <summary>
/// Gets the message consume error event
/// </summary>
IEvent<MessageErrorEventContext> MessageConsumeError { get; }

/// <summary>
/// Gets the message consume started event
/// </summary>
IEvent<MessageEventContext> MessageConsumeStarted { get; }

/// <summary>
/// Gets the message produce completed event
/// </summary>
IEvent<MessageEventContext> MessageProduceCompleted { get; }

/// <summary>
/// Gets the message produce error event
/// </summary>
IEvent<MessageErrorEventContext> MessageProduceError { get; }

/// <summary>
/// Gets the message produce started event
/// </summary>
IEvent<MessageEventContext> MessageProduceStarted { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,12 @@ public interface IKafkaConfigurationBuilder
/// <returns></returns>
IKafkaConfigurationBuilder UseLogHandler<TLogHandler>()
where TLogHandler : ILogHandler;

/// <summary>
/// Subscribe the global events defined in <see cref="IGlobalEvents"/>
/// </summary>
/// <param name="observers">A handle to subscribe the events</param>
/// <returns></returns>
IKafkaConfigurationBuilder SubscribeGlobalEvents(Action<IGlobalEvents> observers);
}
}
32 changes: 32 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace KafkaFlow

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / build

File is required to end with a single newline character [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / release

File is required to end with a single newline character [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<out TArg>
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}

Check warning on line 32 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Deploy to GitHub Pages

File is required to end with a single newline character
13 changes: 13 additions & 0 deletions src/KafkaFlow.Abstractions/IEventSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace KafkaFlow
{
/// <summary>
/// Represents an Event subscription.
/// </summary>
public interface IEventSubscription
{
/// <summary>
/// Cancels the subscription to the event.
/// </summary>
void Cancel();
}
}
6 changes: 6 additions & 0 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Collections.Generic;

/// <summary>
/// A context that contains the message and metadata
Expand All @@ -27,6 +28,11 @@ public interface IMessageContext
/// </summary>
IProducerContext ProducerContext { get; }

/// <summary>
/// Gets the items
/// </summary>
IDictionary<string, object> Items { get; }

/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
Expand Down
26 changes: 26 additions & 0 deletions src/KafkaFlow.Abstractions/MessageErrorEventContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace KafkaFlow
{
using System;

/// <summary>
/// Represents the errors in message context used in the events
/// </summary>
public class MessageErrorEventContext : MessageEventContext
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageErrorEventContext"/> class.
/// </summary>
/// <param name="messageContext">The message context</param>
/// <param name="exception">The event exception</param>
public MessageErrorEventContext(IMessageContext messageContext, Exception exception)
: base(messageContext)
{
this.Exception = exception;
}

/// <summary>
/// Gets the exception
/// </summary>
public Exception Exception { get; }
}
}
22 changes: 22 additions & 0 deletions src/KafkaFlow.Abstractions/MessageEventContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace KafkaFlow
{
/// <summary>
/// Represents a message context used in the events
/// </summary>
public class MessageEventContext
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageEventContext"/> class.
/// </summary>
/// <param name="messageContext">The message context</param>
public MessageEventContext(IMessageContext messageContext)
{
this.MessageContext = messageContext;
}

/// <summary>
/// Gets the message context
/// </summary>
public IMessageContext MessageContext { get; }
}
}
3 changes: 3 additions & 0 deletions src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public BatchConsumeMessageContext(
{
this.ConsumerContext = consumer;
this.Message = new Message(null, batchMessage);
this.Items = new Dictionary<string, object>();
}

public Message Message { get; }
Expand All @@ -21,6 +22,8 @@ public BatchConsumeMessageContext(

public IProducerContext ProducerContext => null;

public IDictionary<string, object> Items { get; }

public IMessageContext SetMessage(object key, object value) =>
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message");

Expand Down
8 changes: 5 additions & 3 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
};

services.AddKafka(
kafka => kafka
.UseLogHandler<TraceLogHandler>()
kafka =>
{
kafka.UseLogHandler<TraceLogHandler>()
.AddCluster(
cluster => cluster
.WithBrokers(kafkaBrokers.Split(';'))
Expand Down Expand Up @@ -332,7 +333,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.DefaultTopic(GzipTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()))));
.AddCompressor<GzipMessageCompressor>())));
});

services.AddSingleton<JsonProducer>();
services.AddSingleton<JsonGzipProducer>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.IntegrationTests.Core.Exceptions
{
using System;

public class ErrorExecutingMiddlewareException : Exception
{
public ErrorExecutingMiddlewareException(string middlewareName)
: base($"Exception thrown executing {middlewareName}")
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace KafkaFlow.IntegrationTests.Core.Exceptions
{
using System;

public class PartitionAssignmentException : Exception
{
private const string ExceptionMessage = "Partition assignment hasn't occurred yet.";

public PartitionAssignmentException()
: base(ExceptionMessage)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.IntegrationTests.Core.Producers
{
internal class JsonProducer2
{
}
}
Loading

0 comments on commit 0926b94

Please sign in to comment.