From b9a13fd87d7efcaaab2a59856c42e2d0d55ea9e6 Mon Sep 17 00:00:00 2001 From: numinex Date: Thu, 21 Sep 2023 22:28:02 +0200 Subject: [PATCH] Fixed background message batching mechanism version-0.0.12 --- Iggy_SDK.sln.DotSettings.user | 4 +- Iggy_SDK/Contracts/Http/MessageSendRequest.cs | 2 - .../Http/MessageSendRequestGeneric.cs | 10 +++ Iggy_SDK/MessageStream/IIggyPublisher.cs | 4 +- .../Implementations/HttpMessageStream.cs | 21 +++--- .../Implementations/TcpMessageStream.cs | 75 +++++++++---------- .../MessageSenderDispatcher.cs | 26 +++---- .../Tcp/IggyTcpPollMessagesFixture.cs | 23 ++++-- .../Tcp/IggyTcpSendMessagesFixture.cs | 6 -- Iggy_Sample_Consumer/Program.cs | 12 ++- Iggy_Sample_Producer/Program.cs | 64 +++------------- 11 files changed, 112 insertions(+), 135 deletions(-) create mode 100644 Iggy_SDK/Contracts/Http/MessageSendRequestGeneric.cs diff --git a/Iggy_SDK.sln.DotSettings.user b/Iggy_SDK.sln.DotSettings.user index 2ccf436..034b203 100644 --- a/Iggy_SDK.sln.DotSettings.user +++ b/Iggy_SDK.sln.DotSettings.user @@ -292,10 +292,10 @@ <SessionState ContinuousTestingMode="0" Name="PollMessagesE2ETcp #7" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> <Project Location="/home/numinex/projects/iggy-dotnet-client/Iggy_SDK_Tests" Presentation="&lt;Iggy_SDK_Tests&gt;" /> </SessionState> - <SessionState ContinuousTestingMode="0" IsActive="True" Name="Test #6" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> + <SessionState ContinuousTestingMode="0" Name="Test #6" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> <Project Location="/home/numinex/projects/iggy-dotnet-client/Iggy_SDK_Tests" Presentation="&lt;Iggy_SDK_Tests&gt;" /> </SessionState> - <SessionState ContinuousTestingMode="0" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> + <SessionState ContinuousTestingMode="0" IsActive="True" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> <Project Location="/home/numinex/projects/iggy-dotnet-client/Iggy_SDK_Tests" Presentation="&lt;Iggy_SDK_Tests&gt;" /> </SessionState> diff --git a/Iggy_SDK/Contracts/Http/MessageSendRequest.cs b/Iggy_SDK/Contracts/Http/MessageSendRequest.cs index 45db714..d41139b 100644 --- a/Iggy_SDK/Contracts/Http/MessageSendRequest.cs +++ b/Iggy_SDK/Contracts/Http/MessageSendRequest.cs @@ -1,7 +1,5 @@ -using Iggy_SDK.JsonConfiguration; using Iggy_SDK.Kinds; using Iggy_SDK.Messages; -using System.Text.Json.Serialization; namespace Iggy_SDK.Contracts.Http; diff --git a/Iggy_SDK/Contracts/Http/MessageSendRequestGeneric.cs b/Iggy_SDK/Contracts/Http/MessageSendRequestGeneric.cs new file mode 100644 index 0000000..291c102 --- /dev/null +++ b/Iggy_SDK/Contracts/Http/MessageSendRequestGeneric.cs @@ -0,0 +1,10 @@ +using Iggy_SDK.Kinds; +namespace Iggy_SDK.Contracts.Http; + +public sealed class MessageSendRequest +{ + public required Identifier StreamId { get; init; } + public required Identifier TopicId { get; init; } + public required Partitioning Partitioning { get; init; } + public required IList Messages { get; init; } +} \ No newline at end of file diff --git a/Iggy_SDK/MessageStream/IIggyPublisher.cs b/Iggy_SDK/MessageStream/IIggyPublisher.cs index f4ef9c9..02d3237 100644 --- a/Iggy_SDK/MessageStream/IIggyPublisher.cs +++ b/Iggy_SDK/MessageStream/IIggyPublisher.cs @@ -8,8 +8,8 @@ public interface IIggyPublisher Task SendMessagesAsync(MessageSendRequest request, Func? encryptor = null, CancellationToken token = default); //TODO - should I create a MessageSendRequest to clean up function arguments ? - Task SendMessagesAsync(Identifier streamId, Identifier topicId, Partitioning partitioning, - IList messages, Func serializer, + Task SendMessagesAsync(MessageSendRequest request, + Func serializer, Func? encryptor = null, Dictionary? headers = null, CancellationToken token = default); } \ No newline at end of file diff --git a/Iggy_SDK/MessageStream/Implementations/HttpMessageStream.cs b/Iggy_SDK/MessageStream/Implementations/HttpMessageStream.cs index a2d09cd..9c52bbf 100644 --- a/Iggy_SDK/MessageStream/Implementations/HttpMessageStream.cs +++ b/Iggy_SDK/MessageStream/Implementations/HttpMessageStream.cs @@ -172,17 +172,18 @@ public async Task SendMessagesAsync(MessageSendRequest request, await _channel!.Writer.WriteAsync(request, token); } - public async Task SendMessagesAsync(Identifier streamId, Identifier topicId, Partitioning partitioning, - IList messages, Func serializer, + public async Task SendMessagesAsync(MessageSendRequest request, + Func serializer, Func? encryptor = null, Dictionary? headers = null, CancellationToken token = default) { + var messages = request.Messages; //TODO - maybe get rid of this closure ? - var request = new MessageSendRequest + var sendRequest = new MessageSendRequest { - StreamId = streamId, - TopicId = topicId, - Partitioning = partitioning, + StreamId = request.StreamId, + TopicId = request.TopicId, + Partitioning = request.Partitioning, Messages = messages.Select(message => new Message { Id = Guid.NewGuid(), @@ -195,17 +196,17 @@ public async Task SendMessagesAsync(Identifier streamId, Identifier to { try { - await _messageInvoker.SendMessagesAsync(request, token); + await _messageInvoker.SendMessagesAsync(sendRequest, token); } catch { - var partId = BinaryPrimitives.ReadInt32LittleEndian(request.Partitioning.Value); + var partId = BinaryPrimitives.ReadInt32LittleEndian(sendRequest.Partitioning.Value); _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}", - request.StreamId, request.TopicId, partId); + sendRequest.StreamId, sendRequest.TopicId, partId); } return; } - await _channel!.Writer.WriteAsync(request, token); + await _channel!.Writer.WriteAsync(sendRequest, token); } public async Task FetchMessagesAsync(MessageFetchRequest request, diff --git a/Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs b/Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs index cfac226..1267457 100644 --- a/Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs +++ b/Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs @@ -271,6 +271,7 @@ public async Task SendMessagesAsync(MessageSendRequest request, request.Messages[i] = request.Messages[i] with { Payload = encryptor(request.Messages[i].Payload) }; } } + if (_messageInvoker is not null) { try @@ -285,68 +286,60 @@ public async Task SendMessagesAsync(MessageSendRequest request, } return; } - await _channel!.Writer.WriteAsync(request, token); } - public async Task SendMessagesAsync(Identifier streamId, Identifier topicId, Partitioning partitioning, - IList messages, Func serializer, + public async Task SendMessagesAsync(MessageSendRequest request, + Func serializer, Func? encryptor = null, Dictionary? headers = null, CancellationToken token = default) { + var messages = request.Messages; if (messages.Count == 0) { return; } //TODO - explore making fields of Message class mutable, so there is no need to create em from scratch - var messagesPool = MemoryPool.Shared.Rent(messages.Count); - var messagesBuffer = messagesPool.Memory; - try + var messagesBuffer = new Message[messages.Count]; + for (var i = 0; i < messages.Count || token.IsCancellationRequested; i++) { - for (var i = 0; i < messages.Count || token.IsCancellationRequested; i++) + messagesBuffer[i] = new Message { - messagesBuffer.Span[i] = new Message - { - Payload = encryptor is not null ? encryptor(serializer(messages[i])) : serializer(messages[i]), - Headers = headers, - Id = Guid.NewGuid() - }; - } + Payload = encryptor is not null ? encryptor(serializer(messages[i])) : serializer(messages[i]), + Headers = headers, + Id = Guid.NewGuid() + }; + } + + var sendRequest = new MessageSendRequest + { + StreamId = request.StreamId, + TopicId = request.TopicId, + Partitioning = request.Partitioning, + Messages = messagesBuffer + }; - var request = new MessageSendRequest + if (_messageInvoker is not null) + { + try { - StreamId = streamId, - TopicId = topicId, - Partitioning = partitioning, - Messages = messagesBuffer.Span[..messages.Count].ToArray() - }; - - if (_messageInvoker is not null) + await _messageInvoker.SendMessagesAsync(sendRequest, token); + } + catch { - try - { - await _messageInvoker.SendMessagesAsync(request, token); - } - catch - { - var partId = BinaryPrimitives.ReadInt32LittleEndian(request.Partitioning.Value); - _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}", - request.StreamId, request.TopicId, partId); - } - return; + var partId = BinaryPrimitives.ReadInt32LittleEndian(sendRequest.Partitioning.Value); + _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}", + request.StreamId, request.TopicId, partId); } - await _channel!.Writer.WriteAsync(request, token); - } - finally - { - messagesPool.Dispose(); + return; } + await _channel!.Writer.WriteAsync(sendRequest, token); } public async Task> FetchMessagesAsync(MessageFetchRequest request, Func serializer, Func? decryptor = null, CancellationToken token = default) { - await SendMessagesPayload(request, token); + await SendFetchMessagesRequestPayload(request, token); var buffer = MemoryPool.Shared.Rent(BufferSizes.ExpectedResponseSize); try { @@ -471,7 +464,7 @@ private async Task StartPollingMessagesAsync(MessageFetchRequest reque public async Task FetchMessagesAsync(MessageFetchRequest request, Func? decryptor = null, CancellationToken token = default) { - await SendMessagesPayload(request, token); + await SendFetchMessagesRequestPayload(request, token); var buffer = ArrayPool.Shared.Rent(BufferSizes.ExpectedResponseSize); try { @@ -507,7 +500,7 @@ public async Task FetchMessagesAsync(MessageFetchRequest request ArrayPool.Shared.Return(buffer); } } - private async Task SendMessagesPayload(MessageFetchRequest request, CancellationToken token) + private async Task SendFetchMessagesRequestPayload(MessageFetchRequest request, CancellationToken token) { var messageBufferSize = CalculateMessageBufferSize(request); var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize); diff --git a/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs b/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs index 14fab93..9223b79 100644 --- a/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs +++ b/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs @@ -16,11 +16,11 @@ internal sealed class MessageSenderDispatcher private Task? _timerTask; private readonly CancellationTokenSource _cts = new(); private readonly IMessageInvoker _messageInvoker; - private readonly Channel? _channel; + private readonly Channel _channel; private readonly int _maxMessages; private readonly int _maxRequestsInPoll; - internal MessageSenderDispatcher(IntervalBatchingSettings sendMessagesOptions, Channel? channel, + internal MessageSenderDispatcher(IntervalBatchingSettings sendMessagesOptions, Channel channel, IMessageInvoker messageInvoker, ILoggerFactory loggerFactory) { _timer = new (sendMessagesOptions.Interval); @@ -34,15 +34,15 @@ internal void Start() { _timerTask = SendMessages(); } - private async Task SendMessages() + internal async Task SendMessages() { - var messagesSendRequests = MemoryPool.Shared.Rent(_maxRequestsInPoll); + var messagesSendRequests = new MessageSendRequest[_maxRequestsInPoll]; while (await _timer.WaitForNextTickAsync(_cts.Token)) { int idx = 0; - while (_channel!.Reader.TryRead(out var msg)) + while (_channel.Reader.TryRead(out var msg)) { - messagesSendRequests.Memory.Span[idx++] = msg; + messagesSendRequests[idx++] = msg; } if (idx == 0) @@ -50,27 +50,27 @@ private async Task SendMessages() continue; } - var canBatchMessages = CanBatchMessages(messagesSendRequests.Memory.Span[..idx]); + var canBatchMessages = CanBatchMessages(messagesSendRequests.AsSpan()[..idx]); if (!canBatchMessages) { for (int i = 0; i < idx; i++) { try { - await _messageInvoker.SendMessagesAsync(messagesSendRequests.Memory.Span[i], token: _cts.Token); + await _messageInvoker.SendMessagesAsync(messagesSendRequests[i], token: _cts.Token); } catch { - var partId = BinaryPrimitives.ReadInt32LittleEndian(messagesSendRequests.Memory.Span[i].Partitioning.Value); + var partId = BinaryPrimitives.ReadInt32LittleEndian(messagesSendRequests[i].Partitioning.Value); _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}", - messagesSendRequests.Memory.Span[i].StreamId, messagesSendRequests.Memory.Span[i].TopicId, partId); + messagesSendRequests[i].StreamId, messagesSendRequests[i].TopicId, partId); } } continue; } - var messagesBatches = BatchMessages(messagesSendRequests.Memory.Span[..idx]); + var messagesBatches = BatchMessages(messagesSendRequests.AsSpan()[..idx]); foreach (var messages in messagesBatches) { try @@ -152,8 +152,9 @@ private MessageSendRequest[] BatchMessages(Span requests) TopicId = requests[0].TopicId, Messages = messages[..idx].ToArray() }; - messagesBatchesBuffer.Memory.Span[batchCounter] = messageSendRequest; + messagesBatchesBuffer.Memory.Span[batchCounter++] = messageSendRequest; } + return messagesBatchesBuffer.Memory.Span[..batchCounter].ToArray(); } finally { @@ -161,7 +162,6 @@ private MessageSendRequest[] BatchMessages(Span requests) messagesBatchesBuffer.Dispose(); } - return messagesBatchesBuffer.Memory.Span[..batchCounter].ToArray(); } internal async Task StopAsync() { diff --git a/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpPollMessagesFixture.cs b/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpPollMessagesFixture.cs index bf7364b..74768b3 100644 --- a/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpPollMessagesFixture.cs +++ b/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpPollMessagesFixture.cs @@ -1,5 +1,6 @@ using DotNet.Testcontainers.Builders; using Iggy_SDK; +using Iggy_SDK_Tests.Utils.DummyObj; using Iggy_SDK.Contracts.Http; using Iggy_SDK.Enums; using Iggy_SDK.Factory; @@ -59,13 +60,25 @@ public async Task InitializeAsync() await sut.CreateTopicAsync(Identifier.Numeric(StreamRequest.StreamId), TopicRequest); await sut.CreateTopicAsync(Identifier.Numeric(StreamRequest.StreamId), HeadersTopicRequest); - await sut.SendMessagesAsync(Identifier.Numeric(StreamId), Identifier.Numeric(TopicId), - Partitioning.PartitionId(PartitionId), MessageFactory.GenerateDummyMessages(MessageCount), MessageFactory.Serializer); - await sut.SendMessagesAsync(Identifier.Numeric(StreamId), Identifier.Numeric(HeadersTopicId), - Partitioning.PartitionId(PartitionId), MessageFactory.GenerateDummyMessages(MessageCount), MessageFactory.Serializer, + await sut.SendMessagesAsync(new MessageSendRequest + { + Messages = MessageFactory.GenerateDummyMessages(MessageCount), + Partitioning = Partitioning.PartitionId(PartitionId), + StreamId = Identifier.Numeric(StreamId), + TopicId = Identifier.Numeric(TopicId), + }, + MessageFactory.Serializer); + + await sut.SendMessagesAsync(new MessageSendRequest + { + Messages = MessageFactory.GenerateDummyMessages(MessageCount), + Partitioning = Partitioning.PartitionId(PartitionId), + StreamId = Identifier.Numeric(StreamId), + TopicId = Identifier.Numeric(TopicId), + }, + MessageFactory.Serializer, headers: MessageFactory.GenerateMessageHeaders(HeadersCount)); - //await Task.Delay(2500); } diff --git a/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpSendMessagesFixture.cs b/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpSendMessagesFixture.cs index d2a8921..6c7dce2 100644 --- a/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpSendMessagesFixture.cs +++ b/Iggy_SDK_Tests/E2ETests/Fixtures/Tcp/IggyTcpSendMessagesFixture.cs @@ -47,12 +47,6 @@ public async Task InitializeAsync() var channel = Channel.CreateUnbounded(); - var sendMessagesOptions = new IntervalBatchingSettings - { - Enabled = true, - Interval = TimeSpan.FromMilliseconds(1), - MaxMessagesPerBatch = 1000 - }; var loggerFactory = NullLoggerFactory.Instance; sut = new TcpMessageInvoker(socket); var messageStream = new TcpMessageStream(socket, channel, loggerFactory, sut); diff --git a/Iggy_Sample_Consumer/Program.cs b/Iggy_Sample_Consumer/Program.cs index 6eb1f66..d8f33c2 100644 --- a/Iggy_Sample_Consumer/Program.cs +++ b/Iggy_Sample_Consumer/Program.cs @@ -12,11 +12,19 @@ var jsonOptions = new JsonSerializerOptions(); jsonOptions.PropertyNamingPolicy = new ToSnakeCaseNamingPolicy(); jsonOptions.WriteIndented = true; -var protocol = Protocol.Http; +var protocol = Protocol.Tcp; var bus = MessageStreamFactory.CreateMessageStream(options => { - options.BaseAdress = "http://127.0.0.1:3000"; + options.BaseAdress = "127.0.0.1:8090"; options.Protocol = protocol; + + options.IntervalBatchingConfig = x => + { + x.Enabled = true; + x.Interval = TimeSpan.FromMilliseconds(100); + x.MaxMessagesPerBatch = 1000; + x.MaxRequests = 4096; + }; }); Console.WriteLine("Using protocol : {0}", protocol.ToString()); diff --git a/Iggy_Sample_Producer/Program.cs b/Iggy_Sample_Producer/Program.cs index 90b2cea..1f40cc9 100644 --- a/Iggy_Sample_Producer/Program.cs +++ b/Iggy_Sample_Producer/Program.cs @@ -12,65 +12,19 @@ using System.Security.Cryptography; using System.Text; -var protocol = Protocol.Http; +var protocol = Protocol.Tcp; var bus = MessageStreamFactory.CreateMessageStream(options => { - options.BaseAdress = "http://localhost:3000"; + options.BaseAdress = "127.0.0.1:8090"; options.Protocol = protocol; options.IntervalBatchingConfig = x => { + x.Enabled = true; + x.Interval = TimeSpan.FromMilliseconds(101); x.MaxMessagesPerBatch = 1000; - x.Interval = TimeSpan.FromMilliseconds(100); + x.MaxRequests = 4096; }; }); -var topicsPermissions = new Dictionary(); -topicsPermissions.Add( - 1, new TopicPermissions - { - ManageTopic = true, - PollMessages = true, - ReadTopic = false, - SendMessages = false - }); -var streamPermission = new Dictionary(); -streamPermission.Add( - 1, new StreamPermissions - { - ManageStream = true, - ManageTopics = false, - PollMessages = true, - ReadStream = false, - ReadTopics = true, - SendMessages = true, - Topics = topicsPermissions - }); -/* -await bus.CreateUser(new CreateUserRequest -{ - Username = "user_new", - Password = "newuser", - Status = UserStatus.Active, - Permissions = new Permissions - { - Global = new GlobalPermissions - { - ManageServers = true, - ReadServers = true, - ManageUsers = true, - ReadUsers = true, - ManageStreams = false, - ManageTopics = false, - PollMessages = false, - SendMessages = false, - ReadStreams = true, - ReadTopics = true - }, - Streams = streamPermission - } - -}); -*/ -await bus.LogoutUser(); Console.WriteLine("Using protocol : {0}", protocol.ToString()); var streamIdVal = 1; var topicIdVal = 1; @@ -185,7 +139,13 @@ async Task ProduceMessages(IIggyClient bus, StreamResponse? stream, TopicRespons try { - await bus.SendMessagesAsync(streamId, topicId, Partitioning.PartitionId(3), messages, + await bus.SendMessagesAsync(new MessageSendRequest + { + StreamId = streamId, + TopicId = topicId, + Partitioning = Partitioning.PartitionId(3), + Messages = messages + }, serializer, encryptor, headers); }