diff --git a/.gitignore b/.gitignore index 1b1a60e7..5be28dcb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,19 +1,19 @@ -################################################################################ -# This .gitignore file was automatically created by Microsoft(R) Visual Studio. -################################################################################ - -/.vs -/.idea -/artifacts -*/**/bin -*/**/obj -**/packages -*/**/*.csproj.user +################################################################################ +# This .gitignore file was automatically created by Microsoft(R) Visual Studio. +################################################################################ + +/.vs +/.idea +/artifacts +*/**/bin +*/**/obj +**/packages +*/**/*.csproj.user TestResults */**/BenchmarkDotNet.Artifacts **/*.DotSettings.user */**/appsettings.Development.json *.user -*.suo -.fake -.ionide \ No newline at end of file +*.suo +.fake +.ionide diff --git a/NBB.Messaging.slnf b/NBB.Messaging.slnf index efc5ecfc..ae350e15 100644 --- a/NBB.Messaging.slnf +++ b/NBB.Messaging.slnf @@ -6,6 +6,7 @@ "src\\Messaging\\NBB.Messaging.Effects\\NBB.Messaging.Effects.csproj", "src\\Messaging\\NBB.Messaging.Host\\NBB.Messaging.Host.csproj", "src\\Messaging\\NBB.Messaging.InProcessMessaging\\NBB.Messaging.InProcessMessaging.csproj", + "src\\Messaging\\NBB.Messaging.JetStream\\NBB.Messaging.JetStream.csproj", "src\\Messaging\\NBB.Messaging.MultiTenancy\\NBB.Messaging.MultiTenancy.csproj", "src\\Messaging\\NBB.Messaging.Nats\\NBB.Messaging.Nats.csproj", "src\\Messaging\\NBB.Messaging.Noop\\NBB.Messaging.Noop.csproj", diff --git a/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs index 40c11f22..4d7a5021 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs @@ -1,20 +1,20 @@ // Copyright (c) TotalSoft. // This source code is licensed under the MIT license. -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace NBB.Messaging.Abstractions -{ - public interface IMessageBusSubscriber - { - Task SubscribeAsync(Func, Task> handler, - MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default); - - - Task SubscribeAsync(Func handler, - MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default) - => SubscribeAsync(handler, options, cancellationToken); - } -} \ No newline at end of file +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.Messaging.Abstractions +{ + public interface IMessageBusSubscriber + { + Task SubscribeAsync(Func, Task> handler, + MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default); + + + Task SubscribeAsync(Func handler, + MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default) + => SubscribeAsync(handler, options, cancellationToken); + } +} diff --git a/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs b/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs index 37402554..dc787a7a 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/IMessagingTransport.cs @@ -18,7 +18,7 @@ public interface IMessagingTransport /// Subscription options /// /// An object that when disposed unsubscribes the handler from the topic - Task SubscribeAsync(string topic, Func handler, + Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default); /// @@ -45,6 +45,11 @@ public record TransportSendContext( public record TransportReceiveContext(TransportReceivedData ReceivedData); + public record PipelineResult(bool Success, string Error) + { + public static PipelineResult SuccessResult = new PipelineResult(true, null); + } + public abstract record TransportReceivedData { diff --git a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs index a7abbd8c..3047ceba 100644 --- a/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs +++ b/src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs @@ -36,11 +36,12 @@ public async Task SubscribeAsync(Func MsgHandler(TransportReceiveContext receiveContext) { _logger.LogDebug("Messaging subscriber received message from subject {Subject}", topicName); MessagingEnvelope messageEnvelope = null; + try { messageEnvelope = receiveContext.ReceivedData switch @@ -51,19 +52,30 @@ async Task MsgHandler(TransportReceiveContext receiveContext) => new MessagingEnvelope(headers, _messageSerDes.DeserializePayload(payloadBytes, headers, options?.SerDes)), _ => throw new Exception("Invalid received message data") }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Deserialization error on topic {Subject}.", topicName); + + if (messageEnvelope != null) + _deadLetterQueue.Push(messageEnvelope, topicName, ex); + else + _deadLetterQueue.Push(receiveContext.ReceivedData, topicName, ex); + return new PipelineResult(false, ex.Message); + } + + try + { await handler(messageEnvelope); + return PipelineResult.SuccessResult; } catch (Exception ex) { _logger.LogError(ex, "Messaging subscriber encountered an error when handling a message from subject {Subject}.", topicName); - if (messageEnvelope != null) - _deadLetterQueue.Push(messageEnvelope, topicNameWithoutPrefix, ex); - else - _deadLetterQueue.Push(receiveContext.ReceivedData, topicNameWithoutPrefix, ex); - + return new PipelineResult(false, ex.Message); } } diff --git a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs index 543fa95b..20fea1d9 100644 --- a/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs +++ b/src/Messaging/NBB.Messaging.Host/MessagingPipeline/ExceptionHandlingMiddleware.cs @@ -48,10 +48,12 @@ public async Task Invoke(MessagingContext context, CancellationToken cancellatio "An unhandled exception has occurred while processing message of type {MessageType}.", context.MessagingEnvelope.Payload.GetType().GetPrettyName()); - Activity.Current?.SetException(ex); - Activity.Current?.SetStatus(Status.Error); + //Activity.Current?.SetException(ex); + //Activity.Current?.SetStatus(Status.Error); _deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex); + + throw; } finally { diff --git a/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs b/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs index 6a7f68d4..331e2947 100644 --- a/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.InProcessMessaging/Internal/InProcessMessagingTransport.cs @@ -23,7 +23,7 @@ public InProcessMessagingTransport(IStorage storage, ILogger SubscribeAsync(string topic, Func handler, + public async Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { diff --git a/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs b/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs index 2c32a87b..f33cf21c 100644 --- a/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs @@ -34,10 +34,8 @@ public Task PublishAsync(string topic, TransportSendContext sendContext, Cancell }); } - public Task SubscribeAsync(string topic, - Func handler, - SubscriptionTransportOptions options = null, - CancellationToken cancellationToken = default) + public Task SubscribeAsync(string topic, Func> handler, + SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { IDisposable consumer = null; diff --git a/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs b/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs index 94f06832..88074f17 100644 --- a/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs @@ -22,7 +22,7 @@ public StanMessagingTransport(StanConnectionProvider stanConnectionManager, IOpt _natsOptions = natsOptions; } - public Task SubscribeAsync(string topic, Func handler, + public Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { diff --git a/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs b/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs index 9df6dfe2..012fc7ad 100644 --- a/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs @@ -10,7 +10,7 @@ namespace NBB.Messaging.Noop { public class NoopMessagingTransport : IMessagingTransport { - public Task SubscribeAsync(string topic, Func handler, + public Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { diff --git a/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs b/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs index d7093c7c..e9371f17 100644 --- a/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs +++ b/src/Messaging/NBB.Messaging.OpenTelemetry/Publisher/OpenTelemetryPublisherDecorator.cs @@ -47,7 +47,12 @@ void NewCustomizer(MessagingEnvelope outgoingEnvelope) var formattedTopicName = _topicRegistry.GetTopicForName(options.TopicName) ?? _topicRegistry.GetTopicForMessageType(message.GetType()); - var operationName = $"{message.GetType().GetPrettyName()} send"; + + var prettyName = message.GetType().GetPrettyName(); + if (prettyName.Contains("AnonymousType")) + prettyName = formattedTopicName; + + var operationName = $"{prettyName} send"; using var activity = activitySource.StartActivity(operationName, ActivityKind.Producer); diff --git a/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs b/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs index f3fafaa0..6d4e8e8f 100644 --- a/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs +++ b/src/Messaging/NBB.Messaging.OpenTelemetry/Subscriber/OpenTelemetrySubscriberDecorator.cs @@ -1,21 +1,16 @@ // Copyright (c) TotalSoft. // This source code is licensed under the MIT license. +using NBB.Core.Abstractions; using NBB.Messaging.Abstractions; -using OpenTelemetry.Trace; using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; using System; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Reflection; -using System.Text; using System.Threading; using System.Threading.Tasks; -using OpenTelemetry.Context.Propagation; -using NBB.Core.Abstractions; -using NBB.Messaging.OpenTelemetry.Publisher; -using System.Reflection.PortableExecutable; namespace NBB.Messaging.OpenTelemetry.Subscriber { diff --git a/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs b/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs index 89f0ae30..60673f3b 100644 --- a/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs +++ b/src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs @@ -59,7 +59,7 @@ public async Task PublishAsync(string topic, TransportSendContext sendContext, await _client.PublishAsync(request); } - public async Task SubscribeAsync(string topic, Func handler, + public async Task SubscribeAsync(string topic, Func> handler, SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default) { var transport = options ?? SubscriptionTransportOptions.Default; @@ -108,8 +108,8 @@ public async Task SubscribeAsync(string topic, Func>(); - var mockServerStream = Mock.Of>(x => x.Current == msg); - Mock.Get(mockServerStream) - .Setup(m => m.MoveNext(default)) - .Returns(() => Task.FromResult(msgCount-- > 0)); - - var rusiClient = Mock.Of(); - Mock.Get(rusiClient) - .Setup(m => m.Subscribe(null, null, default)) - .Returns(new AsyncDuplexStreamingCall(mockClientStream, - mockServerStream, - Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { })); - - var topic = "topic"; - var subscriber = new RusiMessagingTransport(rusiClient, - new OptionsWrapper(new RusiOptions() { PubsubName = "pubsub1" }), null); - - var handler = Mock.Of>(); - - // Act - using var subscription = await subscriber.SubscribeAsync(topic, handler); - await Task.Delay(100); //TODO: use more reliable awaiting - - // Assert - Mock.Get(rusiClient) - .Verify(x => x.Subscribe(null, null, default)); - - Mock.Get(mockClientStream) - .Verify(x => x.WriteAsync( - It.Is(req => - req.SubscriptionRequest != null && req.SubscriptionRequest.Topic == topic)) - , Times.Once); - - Mock.Get(mockClientStream) - .Verify(x => x.WriteAsync( - It.Is(req => req.AckRequest != null)), Times.Exactly(3)); - - Mock.Get(handler) - .Verify( - handler => handler(It.Is(m => - Encoding.UTF8.GetString(((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData) - .PayloadBytes) == payload && - ((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData).headers["aaa"] == "bbb")), - Times.Exactly(3)); - } - - public class TestMessage - { - public string TestProp { set; get; } - } - } -} +using Grpc.Core; +using Microsoft.Extensions.Options; +using Moq; +using NBB.Messaging.Abstractions; +using Proto.V1; +using System; +using System.Threading.Tasks; +using Xunit; +using Google.Protobuf; +using System.Text; + +namespace NBB.Messaging.Rusi.Tests +{ + public class SubscriberTests + { + [Fact] + public async Task TestSubscriber() + { + // Arrange + var msgCount = 3; + var payload = "{\"TestProp\":\"test1\"}"; + var msg = new ReceivedMessage() + { + Metadata = { { "aaa", "bbb" } }, + Data = ByteString.CopyFromUtf8(payload) + }; + + var mockClientStream = Mock.Of>(); + var mockServerStream = Mock.Of>(x => x.Current == msg); + Mock.Get(mockServerStream) + .Setup(m => m.MoveNext(default)) + .Returns(() => Task.FromResult(msgCount-- > 0)); + + var rusiClient = Mock.Of(); + Mock.Get(rusiClient) + .Setup(m => m.Subscribe(null, null, default)) + .Returns(new AsyncDuplexStreamingCall(mockClientStream, + mockServerStream, + Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { })); + + var topic = "topic"; + var subscriber = new RusiMessagingTransport(rusiClient, + new OptionsWrapper(new RusiOptions() { PubsubName = "pubsub1" }), null); + + var handler = Mock.Of>>(); + + // Act + using var subscription = await subscriber.SubscribeAsync(topic, handler); + await Task.Delay(100); //TODO: use more reliable awaiting + + // Assert + Mock.Get(rusiClient) + .Verify(x => x.Subscribe(null, null, default)); + + Mock.Get(mockClientStream) + .Verify(x => x.WriteAsync( + It.Is(req => + req.SubscriptionRequest != null && req.SubscriptionRequest.Topic == topic)) + , Times.Once); + + Mock.Get(mockClientStream) + .Verify(x => x.WriteAsync( + It.Is(req => req.AckRequest != null)), Times.Exactly(3)); + + Mock.Get(handler) + .Verify( + handler => handler(It.Is(m => + Encoding.UTF8.GetString(((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData) + .PayloadBytes) == payload && + ((TransportReceivedData.PayloadBytesAndHeaders)m.ReceivedData).headers["aaa"] == "bbb")), + Times.Exactly(3)); + } + + public class TestMessage + { + public string TestProp { set; get; } + } + } +}