Skip to content

Commit

Permalink
Fixed background message batching mechanism version-0.0.12
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex committed Sep 21, 2023
1 parent 83a9ad5 commit b9a13fd
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 135 deletions.
4 changes: 2 additions & 2 deletions Iggy_SDK.sln.DotSettings.user
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=f56f371c_002D0b48_002D40df_002D90cb_002D314567639646/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="PollMessagesE2ETcp #7" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Project Location="/home/numinex/projects/iggy-dotnet-client/Iggy_SDK_Tests" Presentation="&amp;lt;Iggy_SDK_Tests&amp;gt;" /&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fb39e0ed_002D9ae7_002D42be_002D87b8_002Dc466e2675de2/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="Test #6" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fb39e0ed_002D9ae7_002D42be_002D87b8_002Dc466e2675de2/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="Test #6" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Project Location="/home/numinex/projects/iggy-dotnet-client/Iggy_SDK_Tests" Presentation="&amp;lt;Iggy_SDK_Tests&amp;gt;" /&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fdae582f_002D649c_002D46a6_002D8ad5_002D876f70bf8246/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fdae582f_002D649c_002D46a6_002D8ad5_002D876f70bf8246/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Project Location="/home/numinex/projects/iggy-dotnet-client/Iggy_SDK_Tests" Presentation="&amp;lt;Iggy_SDK_Tests&amp;gt;" /&gt;
&lt;/SessionState&gt;</s:String></wpf:ResourceDictionary>

2 changes: 0 additions & 2 deletions Iggy_SDK/Contracts/Http/MessageSendRequest.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Iggy_SDK.JsonConfiguration;
using Iggy_SDK.Kinds;
using Iggy_SDK.Messages;
using System.Text.Json.Serialization;

namespace Iggy_SDK.Contracts.Http;

Expand Down
10 changes: 10 additions & 0 deletions Iggy_SDK/Contracts/Http/MessageSendRequestGeneric.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Iggy_SDK.Kinds;
namespace Iggy_SDK.Contracts.Http;

public sealed class MessageSendRequest<TMessage>
{
public required Identifier StreamId { get; init; }
public required Identifier TopicId { get; init; }
public required Partitioning Partitioning { get; init; }
public required IList<TMessage> Messages { get; init; }
}
4 changes: 2 additions & 2 deletions Iggy_SDK/MessageStream/IIggyPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ public interface IIggyPublisher
Task SendMessagesAsync(MessageSendRequest request, Func<byte[], byte[]>?
encryptor = null, CancellationToken token = default);
//TODO - should I create a MessageSendRequest<TMessage> to clean up function arguments ?
Task SendMessagesAsync<TMessage>(Identifier streamId, Identifier topicId, Partitioning partitioning,
IList<TMessage> messages, Func<TMessage, byte[]> serializer,
Task SendMessagesAsync<TMessage>(MessageSendRequest<TMessage> request,
Func<TMessage, byte[]> serializer,
Func<byte[], byte[]>? encryptor = null, Dictionary<HeaderKey, HeaderValue>? headers = null,
CancellationToken token = default);
}
21 changes: 11 additions & 10 deletions Iggy_SDK/MessageStream/Implementations/HttpMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,18 @@ public async Task SendMessagesAsync(MessageSendRequest request,
await _channel!.Writer.WriteAsync(request, token);
}

public async Task SendMessagesAsync<TMessage>(Identifier streamId, Identifier topicId, Partitioning partitioning,
IList<TMessage> messages, Func<TMessage, byte[]> serializer,
public async Task SendMessagesAsync<TMessage>(MessageSendRequest<TMessage> request,
Func<TMessage, byte[]> serializer,
Func<byte[], byte[]>? encryptor = null, Dictionary<HeaderKey, HeaderValue>? headers = null,
CancellationToken token = default)
{
var messages = request.Messages;
//TODO - maybe get rid of this closure ?
var request = new MessageSendRequest
var sendRequest = new MessageSendRequest
{
StreamId = streamId,
TopicId = topicId,
Partitioning = partitioning,
StreamId = request.StreamId,
TopicId = request.TopicId,
Partitioning = request.Partitioning,
Messages = messages.Select(message => new Message
{
Id = Guid.NewGuid(),
Expand All @@ -195,17 +196,17 @@ public async Task SendMessagesAsync<TMessage>(Identifier streamId, Identifier to
{
try
{
await _messageInvoker.SendMessagesAsync(request, token);
await _messageInvoker.SendMessagesAsync(sendRequest, token);
}
catch
{
var partId = BinaryPrimitives.ReadInt32LittleEndian(request.Partitioning.Value);
var partId = BinaryPrimitives.ReadInt32LittleEndian(sendRequest.Partitioning.Value);
_logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
request.StreamId, request.TopicId, partId);
sendRequest.StreamId, sendRequest.TopicId, partId);
}
return;
}
await _channel!.Writer.WriteAsync(request, token);
await _channel!.Writer.WriteAsync(sendRequest, token);
}

public async Task<PolledMessages> FetchMessagesAsync(MessageFetchRequest request,
Expand Down
75 changes: 34 additions & 41 deletions Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ public async Task SendMessagesAsync(MessageSendRequest request,
request.Messages[i] = request.Messages[i] with { Payload = encryptor(request.Messages[i].Payload) };
}
}

if (_messageInvoker is not null)
{
try
Expand All @@ -285,68 +286,60 @@ public async Task SendMessagesAsync(MessageSendRequest request,
}
return;
}

await _channel!.Writer.WriteAsync(request, token);
}
public async Task SendMessagesAsync<TMessage>(Identifier streamId, Identifier topicId, Partitioning partitioning,
IList<TMessage> messages, Func<TMessage, byte[]> serializer,
public async Task SendMessagesAsync<TMessage>(MessageSendRequest<TMessage> request,
Func<TMessage, byte[]> serializer,
Func<byte[], byte[]>? encryptor = null, Dictionary<HeaderKey, HeaderValue>? headers = null,
CancellationToken token = default)
{
var messages = request.Messages;
if (messages.Count == 0)
{
return;
}

//TODO - explore making fields of Message class mutable, so there is no need to create em from scratch
var messagesPool = MemoryPool<Message>.Shared.Rent(messages.Count);
var messagesBuffer = messagesPool.Memory;
try
var messagesBuffer = new Message[messages.Count];
for (var i = 0; i < messages.Count || token.IsCancellationRequested; i++)
{
for (var i = 0; i < messages.Count || token.IsCancellationRequested; i++)
messagesBuffer[i] = new Message
{
messagesBuffer.Span[i] = new Message
{
Payload = encryptor is not null ? encryptor(serializer(messages[i])) : serializer(messages[i]),
Headers = headers,
Id = Guid.NewGuid()
};
}
Payload = encryptor is not null ? encryptor(serializer(messages[i])) : serializer(messages[i]),
Headers = headers,
Id = Guid.NewGuid()
};
}

var sendRequest = new MessageSendRequest
{
StreamId = request.StreamId,
TopicId = request.TopicId,
Partitioning = request.Partitioning,
Messages = messagesBuffer
};

var request = new MessageSendRequest
if (_messageInvoker is not null)
{
try
{
StreamId = streamId,
TopicId = topicId,
Partitioning = partitioning,
Messages = messagesBuffer.Span[..messages.Count].ToArray()
};

if (_messageInvoker is not null)
await _messageInvoker.SendMessagesAsync(sendRequest, token);
}
catch
{
try
{
await _messageInvoker.SendMessagesAsync(request, token);
}
catch
{
var partId = BinaryPrimitives.ReadInt32LittleEndian(request.Partitioning.Value);
_logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
request.StreamId, request.TopicId, partId);
}
return;
var partId = BinaryPrimitives.ReadInt32LittleEndian(sendRequest.Partitioning.Value);
_logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
request.StreamId, request.TopicId, partId);
}
await _channel!.Writer.WriteAsync(request, token);
}
finally
{
messagesPool.Dispose();
return;
}
await _channel!.Writer.WriteAsync(sendRequest, token);
}

public async Task<PolledMessages<TMessage>> FetchMessagesAsync<TMessage>(MessageFetchRequest request,
Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default)
{
await SendMessagesPayload(request, token);
await SendFetchMessagesRequestPayload(request, token);
var buffer = MemoryPool<byte>.Shared.Rent(BufferSizes.ExpectedResponseSize);
try
{
Expand Down Expand Up @@ -471,7 +464,7 @@ private async Task StartPollingMessagesAsync<TMessage>(MessageFetchRequest reque
public async Task<PolledMessages> FetchMessagesAsync(MessageFetchRequest request,
Func<byte[], byte[]>? decryptor = null, CancellationToken token = default)
{
await SendMessagesPayload(request, token);
await SendFetchMessagesRequestPayload(request, token);
var buffer = ArrayPool<byte>.Shared.Rent(BufferSizes.ExpectedResponseSize);
try
{
Expand Down Expand Up @@ -507,7 +500,7 @@ public async Task<PolledMessages> FetchMessagesAsync(MessageFetchRequest request
ArrayPool<byte>.Shared.Return(buffer);
}
}
private async Task SendMessagesPayload(MessageFetchRequest request, CancellationToken token)
private async Task SendFetchMessagesRequestPayload(MessageFetchRequest request, CancellationToken token)
{
var messageBufferSize = CalculateMessageBufferSize(request);
var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
Expand Down
26 changes: 13 additions & 13 deletions Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ internal sealed class MessageSenderDispatcher
private Task? _timerTask;
private readonly CancellationTokenSource _cts = new();
private readonly IMessageInvoker _messageInvoker;
private readonly Channel<MessageSendRequest>? _channel;
private readonly Channel<MessageSendRequest> _channel;
private readonly int _maxMessages;
private readonly int _maxRequestsInPoll;

internal MessageSenderDispatcher(IntervalBatchingSettings sendMessagesOptions, Channel<MessageSendRequest>? channel,
internal MessageSenderDispatcher(IntervalBatchingSettings sendMessagesOptions, Channel<MessageSendRequest> channel,
IMessageInvoker messageInvoker, ILoggerFactory loggerFactory)
{
_timer = new (sendMessagesOptions.Interval);
Expand All @@ -34,43 +34,43 @@ internal void Start()
{
_timerTask = SendMessages();
}
private async Task SendMessages()
internal async Task SendMessages()
{
var messagesSendRequests = MemoryPool<MessageSendRequest>.Shared.Rent(_maxRequestsInPoll);
var messagesSendRequests = new MessageSendRequest[_maxRequestsInPoll];
while (await _timer.WaitForNextTickAsync(_cts.Token))
{
int idx = 0;
while (_channel!.Reader.TryRead(out var msg))
while (_channel.Reader.TryRead(out var msg))
{
messagesSendRequests.Memory.Span[idx++] = msg;
messagesSendRequests[idx++] = msg;
}

if (idx == 0)
{
continue;
}

var canBatchMessages = CanBatchMessages(messagesSendRequests.Memory.Span[..idx]);
var canBatchMessages = CanBatchMessages(messagesSendRequests.AsSpan()[..idx]);
if (!canBatchMessages)
{
for (int i = 0; i < idx; i++)
{
try
{
await _messageInvoker.SendMessagesAsync(messagesSendRequests.Memory.Span[i], token: _cts.Token);
await _messageInvoker.SendMessagesAsync(messagesSendRequests[i], token: _cts.Token);
}
catch
{
var partId = BinaryPrimitives.ReadInt32LittleEndian(messagesSendRequests.Memory.Span[i].Partitioning.Value);
var partId = BinaryPrimitives.ReadInt32LittleEndian(messagesSendRequests[i].Partitioning.Value);
_logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
messagesSendRequests.Memory.Span[i].StreamId, messagesSendRequests.Memory.Span[i].TopicId, partId);
messagesSendRequests[i].StreamId, messagesSendRequests[i].TopicId, partId);
}
}

continue;
}

var messagesBatches = BatchMessages(messagesSendRequests.Memory.Span[..idx]);
var messagesBatches = BatchMessages(messagesSendRequests.AsSpan()[..idx]);
foreach (var messages in messagesBatches)
{
try
Expand Down Expand Up @@ -152,16 +152,16 @@ private MessageSendRequest[] BatchMessages(Span<MessageSendRequest> requests)
TopicId = requests[0].TopicId,
Messages = messages[..idx].ToArray()
};
messagesBatchesBuffer.Memory.Span[batchCounter] = messageSendRequest;
messagesBatchesBuffer.Memory.Span[batchCounter++] = messageSendRequest;
}
return messagesBatchesBuffer.Memory.Span[..batchCounter].ToArray();
}
finally
{
ArrayPool<Message>.Shared.Return(messagesBuffer);
messagesBatchesBuffer.Dispose();
}

return messagesBatchesBuffer.Memory.Span[..batchCounter].ToArray();
}
internal async Task StopAsync()
{
Expand Down
23 changes: 18 additions & 5 deletions Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpPollMessagesFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using DotNet.Testcontainers.Builders;
using Iggy_SDK;
using Iggy_SDK_Tests.Utils.DummyObj;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Enums;
using Iggy_SDK.Factory;
Expand Down Expand Up @@ -59,13 +60,25 @@ public async Task InitializeAsync()
await sut.CreateTopicAsync(Identifier.Numeric(StreamRequest.StreamId), TopicRequest);
await sut.CreateTopicAsync(Identifier.Numeric(StreamRequest.StreamId), HeadersTopicRequest);

await sut.SendMessagesAsync(Identifier.Numeric(StreamId), Identifier.Numeric(TopicId),
Partitioning.PartitionId(PartitionId), MessageFactory.GenerateDummyMessages(MessageCount), MessageFactory.Serializer);

await sut.SendMessagesAsync(Identifier.Numeric(StreamId), Identifier.Numeric(HeadersTopicId),
Partitioning.PartitionId(PartitionId), MessageFactory.GenerateDummyMessages(MessageCount), MessageFactory.Serializer,
await sut.SendMessagesAsync(new MessageSendRequest<DummyMessage>
{
Messages = MessageFactory.GenerateDummyMessages(MessageCount),
Partitioning = Partitioning.PartitionId(PartitionId),
StreamId = Identifier.Numeric(StreamId),
TopicId = Identifier.Numeric(TopicId),
},
MessageFactory.Serializer);

await sut.SendMessagesAsync(new MessageSendRequest<DummyMessage>
{
Messages = MessageFactory.GenerateDummyMessages(MessageCount),
Partitioning = Partitioning.PartitionId(PartitionId),
StreamId = Identifier.Numeric(StreamId),
TopicId = Identifier.Numeric(TopicId),
},
MessageFactory.Serializer,
headers: MessageFactory.GenerateMessageHeaders(HeadersCount));

//await Task.Delay(2500);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ public async Task InitializeAsync()

var channel = Channel.CreateUnbounded<MessageSendRequest>();

var sendMessagesOptions = new IntervalBatchingSettings
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(1),
MaxMessagesPerBatch = 1000
};
var loggerFactory = NullLoggerFactory.Instance;
sut = new TcpMessageInvoker(socket);
var messageStream = new TcpMessageStream(socket, channel, loggerFactory, sut);
Expand Down
12 changes: 10 additions & 2 deletions Iggy_Sample_Consumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
var jsonOptions = new JsonSerializerOptions();
jsonOptions.PropertyNamingPolicy = new ToSnakeCaseNamingPolicy();
jsonOptions.WriteIndented = true;
var protocol = Protocol.Http;
var protocol = Protocol.Tcp;
var bus = MessageStreamFactory.CreateMessageStream(options =>
{
options.BaseAdress = "http://127.0.0.1:3000";
options.BaseAdress = "127.0.0.1:8090";
options.Protocol = protocol;
options.IntervalBatchingConfig = x =>
{
x.Enabled = true;
x.Interval = TimeSpan.FromMilliseconds(100);
x.MaxMessagesPerBatch = 1000;
x.MaxRequests = 4096;
};
});

Console.WriteLine("Using protocol : {0}", protocol.ToString());
Expand Down
Loading

0 comments on commit b9a13fd

Please sign in to comment.