From 2a0c7853c694a7ad2cd66c28a148b22bb140c184 Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Wed, 21 Feb 2024 15:34:07 +0200 Subject: [PATCH 1/8] v1 --- NBB.Messaging.slnf | 1 + .../IMessageBusSubscriber.cs | 34 ++-- .../IMessagingTransport.cs | 7 +- .../MessageBusSubscriber.cs | 8 +- .../ExceptionHandlingMiddleware.cs | 4 +- .../Internal/InProcessMessagingTransport.cs | 2 +- .../JetStreamMessagingTransport.cs | 6 +- .../StanMessagingTransport.cs | 2 +- .../NoopMessagingTransport.cs | 2 +- .../RusiMessagingTransport.cs | 6 +- .../SubscriberTests.cs | 162 +++++++++--------- 11 files changed, 121 insertions(+), 113 deletions(-) diff --git a/NBB.Messaging.slnf b/NBB.Messaging.slnf index efc5ecfc..57d0d81c 100644 --- a/NBB.Messaging.slnf +++ b/NBB.Messaging.slnf @@ -10,6 +10,7 @@ "src\\Messaging\\NBB.Messaging.Nats\\NBB.Messaging.Nats.csproj", "src\\Messaging\\NBB.Messaging.Noop\\NBB.Messaging.Noop.csproj", "src\\Messaging\\NBB.Messaging.Rusi\\NBB.Messaging.Rusi.csproj", + "src\\Messaging\\NBB.Messaging.JetStream\\NBB.Messaging.JetStream.csproj", "test\\UnitTests\\Messaging\\NBB.Messaging.Abstractions.Tests\\NBB.Messaging.Abstractions.Tests.csproj", "test\\UnitTests\\Messaging\\NBB.Messaging.DataContracts.Tests\\NBB.Messaging.DataContracts.Tests.csproj", "test\\UnitTests\\Messaging\\NBB.Messaging.Effects.Tests\\NBB.Messaging.Effects.Tests.csproj", diff --git a/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs index 40c11f22..4d7a5021 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs @@ -1,20 +1,20 @@ // Copyright (c) TotalSoft. // This source code is licensed under the MIT license. -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace NBB.Messaging.Abstractions -{ - public interface IMessageBusSubscriber - { - Task SubscribeAsync(Func, Task> handler, - MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default); - - - Task SubscribeAsync(Func handler, - MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default) - => SubscribeAsync(handler, options, cancellationToken); - } -} \ No newline at end of file +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.Messaging.Abstractions +{ + public interface IMessageBusSubscriber + { + Task SubscribeAsync(Func, Task> handler, + MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default); + + + Task SubscribeAsync(Func handler, + MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default) + => SubscribeAsync(handler, options, cancellationToken); + } +} diff --git a/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs b/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs index 37402554..dc787a7a 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs @@ -18,7 +18,7 @@ public interface IMessagingTransport /// Subscription options /// /// An object that when disposed unsubscribes the handler from the topic - Task SubscribeAsync(string topic, Func handler, + Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default); /// @@ -45,6 +45,11 @@ public record TransportSendContext( public record TransportReceiveContext(TransportReceivedData ReceivedData); + public record PipelineResult(bool Success, string Error) + { + public static PipelineResult SuccessResult = new PipelineResult(true, null); + } + public abstract record TransportReceivedData { diff --git a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs index a7abbd8c..ddb68dfb 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs @@ -36,7 +36,7 @@ public async Task SubscribeAsync(Func MsgHandler(TransportReceiveContext receiveContext) { _logger.LogDebug("Messaging subscriber received message from subject {Subject}", topicName); @@ -53,6 +53,7 @@ async Task MsgHandler(TransportReceiveContext receiveContext) }; await handler(messageEnvelope); + return PipelineResult.SuccessResult; } catch (Exception ex) { @@ -60,10 +61,11 @@ async Task MsgHandler(TransportReceiveContext receiveContext) topicName); if (messageEnvelope != null) - _deadLetterQueue.Push(messageEnvelope, topicNameWithoutPrefix, ex); + _deadLetterQueue.Push(messageEnvelope, topicName, ex); else - _deadLetterQueue.Push(receiveContext.ReceivedData, topicNameWithoutPrefix, ex); + _deadLetterQueue.Push(receiveContext.ReceivedData, topicName, ex); + return new PipelineResult(false, ex.Message); } } diff --git a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs index 543fa95b..45ce3cd9 100644 --- a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs +++ b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs @@ -51,7 +51,9 @@ public async Task Invoke(MessagingContext context, CancellationToken cancellatio Activity.Current?.SetException(ex); Activity.Current?.SetStatus(Status.Error); - _deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex); + //_deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex); + + throw; } finally { diff --git a/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs b/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs index 6a7f68d4..331e2947 100644 --- a/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs @@ -23,7 +23,7 @@ public InProcessMessagingTransport(IStorage storage, ILogger SubscribeAsync(string topic, Func handler, + public async Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { diff --git a/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs b/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs index 2c32a87b..f33cf21c 100644 --- a/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs @@ -34,10 +34,8 @@ public Task PublishAsync(string topic, TransportSendContext sendContext, Cancell }); } - public Task SubscribeAsync(string topic, - Func handler, - SubscriptionTransportOptions options = null, - CancellationToken cancellationToken = default) + public Task SubscribeAsync(string topic, Func> handler, + SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { IDisposable consumer = null; diff --git a/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs b/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs index 94f06832..88074f17 100644 --- a/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs @@ -22,7 +22,7 @@ public StanMessagingTransport(StanConnectionProvider stanConnectionManager, IOpt _natsOptions = natsOptions; } - public Task SubscribeAsync(string topic, Func handler, + public Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { diff --git a/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs b/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs index 9df6dfe2..012fc7ad 100644 --- a/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs @@ -10,7 +10,7 @@ namespace NBB.Messaging.Noop { public class NoopMessagingTransport : IMessagingTransport { - public Task SubscribeAsync(string topic, Func handler, + public Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { diff --git a/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs b/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs index 89f0ae30..60673f3b 100644 --- a/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs @@ -59,7 +59,7 @@ public async Task PublishAsync(string topic, TransportSendContext sendContext, await _client.PublishAsync(request); } - public async Task SubscribeAsync(string topic, Func handler, + public async Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { var transport = options ?? SubscriptionTransportOptions.Default; @@ -108,8 +108,8 @@ public async Task SubscribeAsync(string topic, Func>(); - var mockServerStream = Mock.Of>(x => x.Current == msg); - Mock.Get(mockServerStream) - .Setup(m => m.MoveNext(default)) - .Returns(() => Task.FromResult(msgCount-- > 0)); - - var rusiClient = Mock.Of(); - Mock.Get(rusiClient) - .Setup(m => m.Subscribe(null, null, default)) - .Returns(new AsyncDuplexStreamingCall(mockClientStream, - mockServerStream, - Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { })); - - var topic = "topic"; - var subscriber = new RusiMessagingTransport(rusiClient, - new OptionsWrapper(new RusiOptions() { PubsubName = "pubsub1" }), null); - - var handler = Mock.Of>(); - - // Act - using var subscription = await subscriber.SubscribeAsync(topic, handler); - await Task.Delay(100); //TODO: use more reliable awaiting - - // Assert - Mock.Get(rusiClient) - .Verify(x => x.Subscribe(null, null, default)); - - Mock.Get(mockClientStream) - .Verify(x => x.WriteAsync( - It.Is(req => - req.SubscriptionRequest != null && req.SubscriptionRequest.Topic == topic)) - , Times.Once); - - Mock.Get(mockClientStream) - .Verify(x => x.WriteAsync( - It.Is(req => req.AckRequest != null)), Times.Exactly(3)); - - Mock.Get(handler) - .Verify( - handler => handler(It.Is(m => - Encoding.UTF8.GetString(((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData) - .PayloadBytes) == payload && - ((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData).headers["aaa"] == "bbb")), - Times.Exactly(3)); - } - - public class TestMessage - { - public string TestProp { set; get; } - } - } -} +using Grpc.Core; +using Microsoft.Extensions.Options; +using Moq; +using NBB.Messaging.Abstractions; +using Proto.V1; +using System; +using System.Threading.Tasks; +using Xunit; +using Google.Protobuf; +using System.Text; + +namespace NBB.Messaging.Rusi.Tests +{ + public class SubscriberTests + { + [Fact] + public async Task TestSubscriber() + { + // Arrange + var msgCount = 3; + var payload = "{\"TestProp\":\"test1\"}"; + var msg = new ReceivedMessage() + { + Metadata = { { "aaa", "bbb" } }, + Data = ByteString.CopyFromUtf8(payload) + }; + + var mockClientStream = Mock.Of>(); + var mockServerStream = Mock.Of>(x => x.Current == msg); + Mock.Get(mockServerStream) + .Setup(m => m.MoveNext(default)) + .Returns(() => Task.FromResult(msgCount-- > 0)); + + var rusiClient = Mock.Of(); + Mock.Get(rusiClient) + .Setup(m => m.Subscribe(null, null, default)) + .Returns(new AsyncDuplexStreamingCall(mockClientStream, + mockServerStream, + Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { })); + + var topic = "topic"; + var subscriber = new RusiMessagingTransport(rusiClient, + new OptionsWrapper(new RusiOptions() { PubsubName = "pubsub1" }), null); + + var handler = Mock.Of>>(); + + // Act + using var subscription = await subscriber.SubscribeAsync(topic, handler); + await Task.Delay(100); //TODO: use more reliable awaiting + + // Assert + Mock.Get(rusiClient) + .Verify(x => x.Subscribe(null, null, default)); + + Mock.Get(mockClientStream) + .Verify(x => x.WriteAsync( + It.Is(req => + req.SubscriptionRequest != null && req.SubscriptionRequest.Topic == topic)) + , Times.Once); + + Mock.Get(mockClientStream) + .Verify(x => x.WriteAsync( + It.Is(req => req.AckRequest != null)), Times.Exactly(3)); + + Mock.Get(handler) + .Verify( + handler => handler(It.Is(m => + Encoding.UTF8.GetString(((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData) + .PayloadBytes) == payload && + ((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData).headers["aaa"] == "bbb")), + Times.Exactly(3)); + } + + public class TestMessage + { + public string TestProp { set; get; } + } + } +} From 2260d76bca0b79448c5864806ab8bb2b5b647df4 Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Thu, 22 Feb 2024 12:23:40 +0200 Subject: [PATCH 2/8] v2 --- .gitignore | 28 +++++++++---------- .../MessageBusSubscriber.cs | 1 + 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 1b1a60e7..5be28dcb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,19 +1,19 @@ -################################################################################ -# This .gitignore file was automatically created by Microsoft(R) Visual Studio. -################################################################################ - -/.vs -/.idea -/artifacts -*/**/bin -*/**/obj -**/packages -*/**/*.csproj.user +################################################################################ +# This .gitignore file was automatically created by Microsoft(R) Visual Studio. +################################################################################ + +/.vs +/.idea +/artifacts +*/**/bin +*/**/obj +**/packages +*/**/*.csproj.user TestResults */**/BenchmarkDotNet.Artifacts **/*.DotSettings.user */**/appsettings.Development.json *.user -*.suo -.fake -.ionide \ No newline at end of file +*.suo +.fake +.ionide diff --git a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs index ddb68dfb..ae2c29dc 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs @@ -51,6 +51,7 @@ async Task MsgHandler(TransportReceiveContext receiveContext) => new MessagingEnvelope(headers, _messageSerDes.DeserializePayload(payloadBytes, headers, options?.SerDes)), _ => throw new Exception("Invalid received message data") }; + throw new Exception("Invalid received message data"); await handler(messageEnvelope); return PipelineResult.SuccessResult; From 30df899fdcc9d696ea6c4d80ac4928b744485a2c Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Thu, 22 Feb 2024 14:31:00 +0200 Subject: [PATCH 3/8] vv --- NBB.Messaging.slnf | 2 +- .../NBB.Messaging.Abstractions/MessageBusSubscriber.cs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/NBB.Messaging.slnf b/NBB.Messaging.slnf index 57d0d81c..ae350e15 100644 --- a/NBB.Messaging.slnf +++ b/NBB.Messaging.slnf @@ -6,11 +6,11 @@ "src\\Messaging\\NBB.Messaging.Effects\\NBB.Messaging.Effects.csproj", "src\\Messaging\\NBB.Messaging.Host\\NBB.Messaging.Host.csproj", "src\\Messaging\\NBB.Messaging.InProcessMessaging\\NBB.Messaging.InProcessMessaging.csproj", + "src\\Messaging\\NBB.Messaging.JetStream\\NBB.Messaging.JetStream.csproj", "src\\Messaging\\NBB.Messaging.MultiTenancy\\NBB.Messaging.MultiTenancy.csproj", "src\\Messaging\\NBB.Messaging.Nats\\NBB.Messaging.Nats.csproj", "src\\Messaging\\NBB.Messaging.Noop\\NBB.Messaging.Noop.csproj", "src\\Messaging\\NBB.Messaging.Rusi\\NBB.Messaging.Rusi.csproj", - "src\\Messaging\\NBB.Messaging.JetStream\\NBB.Messaging.JetStream.csproj", "test\\UnitTests\\Messaging\\NBB.Messaging.Abstractions.Tests\\NBB.Messaging.Abstractions.Tests.csproj", "test\\UnitTests\\Messaging\\NBB.Messaging.DataContracts.Tests\\NBB.Messaging.DataContracts.Tests.csproj", "test\\UnitTests\\Messaging\\NBB.Messaging.Effects.Tests\\NBB.Messaging.Effects.Tests.csproj", diff --git a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs index ae2c29dc..ddb68dfb 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs @@ -51,7 +51,6 @@ async Task MsgHandler(TransportReceiveContext receiveContext) => new MessagingEnvelope(headers, _messageSerDes.DeserializePayload(payloadBytes, headers, options?.SerDes)), _ => throw new Exception("Invalid received message data") }; - throw new Exception("Invalid received message data"); await handler(messageEnvelope); return PipelineResult.SuccessResult; From 91a5236ac02ba8abca89a3e908fdb5abd9a1199d Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Thu, 22 Feb 2024 14:41:16 +0200 Subject: [PATCH 4/8] fix --- .../MessagingPipeline/ExceptionHandlingMiddleware.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs index 45ce3cd9..e880df49 100644 --- a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs +++ b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs @@ -48,8 +48,8 @@ public async Task Invoke(MessagingContext context, CancellationToken cancellatio "An unhandled exception has occurred while processing message of type {MessageType}.", context.MessagingEnvelope.Payload.GetType().GetPrettyName()); - Activity.Current?.SetException(ex); - Activity.Current?.SetStatus(Status.Error); + //Activity.Current?.SetException(ex); + //Activity.Current?.SetStatus(Status.Error); //_deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex); From 416eef4c56df2c6f047aabf9e47df365fa2c85c8 Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Thu, 22 Feb 2024 14:46:57 +0200 Subject: [PATCH 5/8] ff --- .../MessageBusSubscriber.cs | 20 ++++++++++++++----- .../ExceptionHandlingMiddleware.cs | 2 +- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs index ddb68dfb..3047ceba 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs @@ -41,6 +41,7 @@ async Task MsgHandler(TransportReceiveContext receiveContext) _logger.LogDebug("Messaging subscriber received message from subject {Subject}", topicName); MessagingEnvelope messageEnvelope = null; + try { messageEnvelope = receiveContext.ReceivedData switch @@ -51,14 +52,10 @@ async Task MsgHandler(TransportReceiveContext receiveContext) => new MessagingEnvelope(headers, _messageSerDes.DeserializePayload(payloadBytes, headers, options?.SerDes)), _ => throw new Exception("Invalid received message data") }; - - await handler(messageEnvelope); - return PipelineResult.SuccessResult; } catch (Exception ex) { - _logger.LogError(ex, "Messaging subscriber encountered an error when handling a message from subject {Subject}.", - topicName); + _logger.LogError(ex, "Deserialization error on topic {Subject}.", topicName); if (messageEnvelope != null) _deadLetterQueue.Push(messageEnvelope, topicName, ex); @@ -67,6 +64,19 @@ async Task MsgHandler(TransportReceiveContext receiveContext) return new PipelineResult(false, ex.Message); } + + try + { + await handler(messageEnvelope); + return PipelineResult.SuccessResult; + } + catch (Exception ex) + { + _logger.LogError(ex, "Messaging subscriber encountered an error when handling a message from subject {Subject}.", + topicName); + + return new PipelineResult(false, ex.Message); + } } return await _messagingTransport.SubscribeAsync(topicName, MsgHandler, options?.Transport, cancellationToken); diff --git a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs index e880df49..20fea1d9 100644 --- a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs +++ b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs @@ -51,7 +51,7 @@ public async Task Invoke(MessagingContext context, CancellationToken cancellatio //Activity.Current?.SetException(ex); //Activity.Current?.SetStatus(Status.Error); - //_deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex); + _deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex); throw; } From 64ed202148be016534afdefaa391a5ff82d5eb5d Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Thu, 22 Feb 2024 14:57:22 +0200 Subject: [PATCH 6/8] fff --- .../Publisher/OpenTelemetryPublisherDecorator.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs b/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs index d7093c7c..b460585a 100644 --- a/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs +++ b/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs @@ -47,7 +47,11 @@ void NewCustomizer(MessagingEnvelope outgoingEnvelope) var formattedTopicName = _topicRegistry.GetTopicForName(options.TopicName) ?? _topicRegistry.GetTopicForMessageType(message.GetType()); - var operationName = $"{message.GetType().GetPrettyName()} send"; + + var prettyName = message.GetType().GetPrettyName(); + if (prettyName.Contains("AnonymousType")) + prettyName = formattedTopicName; + var operationName = $"{prettyName} send"; using var activity = activitySource.StartActivity(operationName, ActivityKind.Producer); From 40304513f4b1d9513c51c47a77c4c3e8224b400d Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Fri, 23 Feb 2024 08:41:17 +0200 Subject: [PATCH 7/8] ffff --- .../Publisher/OpenTelemetryPublisherDecorator.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs b/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs index b460585a..e9371f17 100644 --- a/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs +++ b/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs @@ -51,6 +51,7 @@ void NewCustomizer(MessagingEnvelope outgoingEnvelope) var prettyName = message.GetType().GetPrettyName(); if (prettyName.Contains("AnonymousType")) prettyName = formattedTopicName; + var operationName = $"{prettyName} send"; using var activity = activitySource.StartActivity(operationName, ActivityKind.Producer); From e3135acbe47f9712800e5b07119efbc784e7b3fe Mon Sep 17 00:00:00 2001 From: Lucian Ghinet Date: Fri, 23 Feb 2024 09:07:16 +0200 Subject: [PATCH 8/8] fix --- .../Subscriber/OpenTelemetrySubscriberDecorator.cs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs b/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs index f3fafaa0..6d4e8e8f 100644 --- a/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs +++ b/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs @@ -1,21 +1,16 @@ // Copyright (c) TotalSoft. // This source code is licensed under the MIT license. +using NBB.Core.Abstractions; using NBB.Messaging.Abstractions; -using OpenTelemetry.Trace; using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; using System; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Reflection; -using System.Text; using System.Threading; using System.Threading.Tasks; -using OpenTelemetry.Context.Propagation; -using NBB.Core.Abstractions; -using NBB.Messaging.OpenTelemetry.Publisher; -using System.Reflection.PortableExecutable; namespace NBB.Messaging.OpenTelemetry.Subscriber {