Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: refactor the IMessageContext to a more concise structure #152

Merged
merged 5 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions commitlint.config.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
module.exports = {
const Configuration = {
extends: ['@commitlint/config-conventional'],
}
rules: {
'body-max-line-length': [0, 'always'],
},
};
module.exports = Configuration;
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ public Task Handle(IMessageContext context, LogMessages1 message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.Partition,
context.Offset,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Severity.ToString());

return Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ public Task Handle(IMessageContext context, LogMessages2 message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.Partition,
context.Offset,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Message);

return Task.CompletedTask;
Expand Down
22 changes: 15 additions & 7 deletions samples/KafkaFlow.Sample.Avro/KafkaFlow.Sample.Avro.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro\KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro\KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample.BatchConsume/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next)

var text = string.Join(
'\n',
batch.Select(ctx => ((SampleBatchMessage) ctx.Message).Text));
batch.Select(ctx => ((SampleBatchMessage) ctx.Message.Value).Text));

Console.WriteLine(text);

Expand Down
24 changes: 16 additions & 8 deletions samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,26 @@
<IsPackable>false</IsPackable>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Admin.WebApi\KafkaFlow.Admin.WebApi.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.Admin\KafkaFlow.Admin.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj"/>
<ProjectReference Include="..\..\src\KafkaFlow.Admin.WebApi\KafkaFlow.Admin.WebApi.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Admin\KafkaFlow.Admin.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Swashbuckle.AspNetCore.Swagger" Version="5.5.1"/>
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="5.5.1"/>
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="5.5.1"/>
<PackageReference Include="Swashbuckle.AspNetCore.Swagger" Version="5.5.1" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="5.5.1" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="5.5.1" />
</ItemGroup>


Expand Down
8 changes: 8 additions & 0 deletions samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Admin\KafkaFlow.Admin.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Compressor.Gzip\KafkaFlow.Compressor.Gzip.csproj" />
Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample/PrintConsoleHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public Task Handle(IMessageContext context, TestMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.Partition,
context.Offset,
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
IConsumerConfigurationBuilder WithAutoStoreOffsets();

/// <summary>
/// The client should call the <see cref="IMessageContextConsumer.StoreOffset()"/>
/// The client should call the <see cref="IConsumerContext.StoreOffset()"/>
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithManualStoreOffsets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,43 @@ namespace KafkaFlow
/// <summary>
/// Represents the message consumer
/// </summary>
public interface IMessageContextConsumer
public interface IConsumerContext
{
/// <summary>
/// Gets the consumer unique name defined in configuration
/// </summary>
string Name { get; }
string ConsumerName { get; }
filipeesch marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Gets a CancellationToken that is cancelled when the worker is requested to stop
/// </summary>
CancellationToken WorkerStopped { get; }

/// <summary>
/// Gets the worker id that is processing the message
/// </summary>
int WorkerId { get; }

/// <summary>
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
int Partition { get; }

/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long Offset { get; }

/// <summary>
/// Gets the consumer group id from kafka consumer that received the message
/// </summary>
string GroupId { get; }

/// <summary>
/// Gets message timestamp. By default is the UTC timestamp when the message was produced
/// </summary>
Expand Down
53 changes: 11 additions & 42 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,31 @@ namespace KafkaFlow
public interface IMessageContext
{
/// <summary>
/// Gets the worker id that is processing the message
/// Gets the message
/// </summary>
int WorkerId { get; }

/// <summary>
/// Gets the message key
/// </summary>
byte[] PartitionKey { get; }

/// <summary>
/// Gets the message value
/// </summary>
object Message { get; }
Message Message { get; }

/// <summary>
/// Gets the message headers
/// </summary>
IMessageHeaders Headers { get; }

/// <summary>
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
int? Partition { get; }

/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long? Offset { get; }

/// <summary>
/// Gets the consumer group id from kafka consumer that received the message
/// </summary>
string GroupId { get; }

/// <summary>
/// Gets the <see cref="IMessageContextConsumer"></see> from the consumed message
/// Gets the <see cref="IConsumerContext"></see> from the consumed message
/// </summary>
IMessageContextConsumer Consumer { get; }
IConsumerContext ConsumerContext { get; }

/// <summary>
/// Transforms the message to a new value
/// Gets the <see cref="IProducerContext"></see> from the produced message
/// </summary>
/// <param name="message">New message value</param>
void TransformMessage(object message);
IProducerContext ProducerContext { get; }

/// <summary>
/// Creates a clone of the current <see cref="IMessageContext"></see>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
/// <returns>
/// A clone of the current <see cref="IMessageContext"></see>
/// </returns>
IMessageContext Clone();
/// <param name="key">The new message key</param>
/// <param name="value">The new message value</param>
/// <returns>A new message context containing the new values</returns>
IMessageContext TransformMessage(object key, object value); // TODO: maybe a better name?
}
}
23 changes: 23 additions & 0 deletions src/KafkaFlow.Abstractions/IProducerContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace KafkaFlow
{
/// <summary>
/// Some producer metadata
/// </summary>
public interface IProducerContext
{
/// <summary>
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
int? Partition { get; }

/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long? Offset { get; }
filipeesch marked this conversation as resolved.
Show resolved Hide resolved
}
}
29 changes: 29 additions & 0 deletions src/KafkaFlow.Abstractions/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace KafkaFlow
{
/// <summary>
/// Represents a Kafka message
/// </summary>
public readonly struct Message
{
/// <summary>
/// Initializes a new instance of the <see cref="Message"/> struct.
/// </summary>
/// <param name="key"><inheritdoc cref="Key"/></param>
/// <param name="value"><inheritdoc cref="Value"/></param>
public Message(object key, object value)
{
this.Key = key;
this.Value = value;
}

/// <summary>
/// Gets the message key
/// </summary>
public object Key { get; }

/// <summary>
/// Gets the message value
/// </summary>
public object Value { get; }
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static IReadOnlyCollection<IMessageContext> GetMessagesBatch(this IMessag
{
if (context is BatchConsumeMessageContext ctx)
{
return (IReadOnlyCollection<IMessageContext>) ctx.Message;
return (IReadOnlyCollection<IMessageContext>) ctx.Message.Value;
}

throw new InvalidOperationException($"This method can only be used on {nameof(BatchConsumeMessageContext)}");
Expand Down
32 changes: 9 additions & 23 deletions src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,41 +1,27 @@
namespace KafkaFlow.BatchConsume
{
using System;
using System.Collections.Generic;

internal class BatchConsumeMessageContext : IMessageContext
{
public BatchConsumeMessageContext(
int workerId,
string groupId,
IMessageContextConsumer consumer,
IConsumerContext consumer,
IReadOnlyCollection<IMessageContext> batchMessage)
{
this.WorkerId = workerId;
this.GroupId = groupId;
this.Consumer = consumer;
this.Message = batchMessage;
this.ConsumerContext = consumer;
this.Message = new Message(null, batchMessage);
}

public int WorkerId { get; }

public byte[] PartitionKey => null;

public object Message { get; private set; }
public Message Message { get; }

public IMessageHeaders Headers { get; } = new MessageHeaders();

public string Topic => null;

public int? Partition => null;

public long? Offset => null;

public string GroupId { get; }

public IMessageContextConsumer Consumer { get; }
public IConsumerContext ConsumerContext { get; }

public void TransformMessage(object message) => this.Message = message;
public IProducerContext ProducerContext => null;

public IMessageContext Clone() => (IMessageContext) this.MemberwiseClone();
public IMessageContext TransformMessage(object key, object value) =>
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message");
}
}
Loading