diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index 872423324..7e285d1a5 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -18,6 +18,7 @@ internal sealed record SubscriptionMetadata(int Sid); internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable { private readonly ILogger _logger; + private readonly bool _debug; private readonly object _gate = new(); private readonly NatsConnection _connection; private readonly string _inboxPrefix; @@ -37,6 +38,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) _connection = connection; _inboxPrefix = inboxPrefix; _logger = _connection.Opts.LoggerFactory.CreateLogger(); + _debug = _logger.IsEnabled(LogLevel.Debug); _cts = new CancellationTokenSource(); _cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval; _timer = Task.Run(CleanupAsync); @@ -148,19 +150,26 @@ public async ValueTask DisposeAsync() public ValueTask RemoveAsync(NatsSubBase sub) { - if (!_bySub.TryGetValue(sub, out var subMetadata)) - { - // this can happen when a call to SubscribeAsync is canceled or timed out before subscribing - // in that case, return as there is nothing to unsubscribe - return default; - } - + SubscriptionMetadata? subMetadata; lock (_gate) { + if (!_bySub.TryGetValue(sub, out subMetadata)) + { + // this can happen when a call to SubscribeAsync is canceled or timed out before subscribing + // in that case, return as there is nothing to unsubscribe + _logger.LogInformation(NatsLogEvents.Subscription, "No need to remove subscription {Subject}", sub.Subject); + return default; + } + _bySub.Remove(sub); _bySid.TryRemove(subMetadata.Sid, out _); } + if (_debug) + { + _logger.LogDebug(NatsLogEvents.Subscription, "Removing subscription {Subject}/{Sid}", sub.Subject, subMetadata.Sid); + } + return _connection.UnsubscribeAsync(subMetadata.Sid); } @@ -234,6 +243,12 @@ await SubscribeInternalAsync( private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { var sid = GetNextSid(); + + if (_debug) + { + _logger.LogDebug(NatsLogEvents.Subscription, "New subscription {Subject}/{Sid}", sub.Subject, sid); + } + lock (_gate) { _bySid[sid] = new SidMetadata(Subject: subject, WeakReference: new WeakReference(sub)); diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 88fdd234d..6b3edb220 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -1,20 +1,24 @@ using System.Threading.Channels; +using Microsoft.Extensions.Logging; using NATS.Client.Core; namespace NATS.Client.Services.Internal; internal class SvcListener : IAsyncDisposable { + private readonly ILogger _logger; private readonly NatsConnection _nats; private readonly Channel _channel; private readonly SvcMsgType _type; private readonly string _subject; private readonly string? _queueGroup; private readonly CancellationTokenSource _cts; + private INatsSub>? _sub; private Task? _readLoop; - public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) + public SvcListener(ILogger logger, NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) { + _logger = logger; _nats = nats; _channel = channel; _type = type; @@ -25,27 +29,59 @@ public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type public async ValueTask StartAsync() { - var sub = await _nats.SubscribeCoreAsync(_subject, _queueGroup, serializer: NatsRawSerializer>.Default, cancellationToken: _cts.Token); + _sub = await _nats.SubscribeCoreAsync(_subject, _queueGroup, serializer: NatsRawSerializer>.Default, cancellationToken: _cts.Token).ConfigureAwait(false); _readLoop = Task.Run(async () => { - await using (sub) + try { - await foreach (var msg in sub.Msgs.ReadAllAsync()) + await foreach (var msg in _sub.Msgs.ReadAllAsync(_cts.Token).ConfigureAwait(false)) { - await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cts.Token).ConfigureAwait(false); + try + { + await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogWarning(NatsSvcLogEvents.Listener, e, "Error writing message to {Subject} listener channel", _subject); + } } } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogError(NatsSvcLogEvents.Listener, e, "Error in {Subject} subscription", _subject); + } }); } public async ValueTask DisposeAsync() { _cts.Cancel(); + + if (_sub != null) + { + try + { + await _sub.DisposeAsync().ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + } + catch (OperationCanceledException) + { + } + } + if (_readLoop != null) { try { - await _readLoop; + await _readLoop.ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index b13a722ff..cdecd7578 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -191,19 +191,23 @@ protected override ValueTask ReceiveInternalAsync( try { msg = NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer); - exception = null; + exception = msg.Error; } catch (Exception e) { _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error building message", Name); exception = e; - // Most likely a serialization error. // Make sure we have a valid message // so handler can reply with an error. msg = new NatsMsg(subject, replyTo, subject.Length + (replyTo?.Length ?? 0), default, default, _nats); } + if (exception is not null) + { + _logger.LogWarning(NatsSvcLogEvents.Endpoint, exception, "Endpoint {Name} error receiving message", Name); + } + return _channel.Writer.WriteAsync(new NatsSvcMsg(msg, this, exception), _cancellationToken); } @@ -212,57 +216,67 @@ protected override ValueTask ReceiveInternalAsync( private async Task HandlerLoop() { var stopwatch = new Stopwatch(); - await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) + try { - Interlocked.Increment(ref _requests); - stopwatch.Restart(); - try - { - await _handler(svcMsg).ConfigureAwait(false); - } - catch (Exception e) + await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) { - int code; - string message; - string body; - if (e is NatsSvcEndpointException epe) - { - code = epe.Code; - message = epe.Message; - body = epe.Body; - } - else + Interlocked.Increment(ref _requests); + stopwatch.Restart(); + try { - // Do not expose exceptions unless explicitly - // thrown as NatsSvcEndpointException - code = 999; - message = "Handler error"; - body = string.Empty; - - // Only log unknown exceptions - _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error processing message", Name); + await _handler(svcMsg).ConfigureAwait(false); } - - try + catch (Exception e) { - if (string.IsNullOrWhiteSpace(body)) + int code; + string message; + string body; + if (e is NatsSvcEndpointException epe) { - await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken); + code = epe.Code; + message = epe.Message; + body = epe.Body; } else { - await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken); + // Do not expose exceptions unless explicitly + // thrown as NatsSvcEndpointException + code = 999; + message = "Handler error"; + body = string.Empty; + + // Only log unknown exceptions + _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error processing message", Name); + } + + try + { + if (string.IsNullOrWhiteSpace(body)) + { + await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken); + } + else + { + await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken); + } + } + catch (Exception e1) + { + _logger.LogError(NatsSvcLogEvents.Endpoint, e1, "Endpoint {Name} error responding", Name); } } - catch (Exception e1) + finally { - _logger.LogError(NatsSvcLogEvents.Endpoint, e1, "Endpoint {Name} error responding", Name); + Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed)); } } - finally - { - Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed)); - } + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error in handler loop", Name); } } diff --git a/src/NATS.Client.Services/NatsSvcLogEvents.cs b/src/NATS.Client.Services/NatsSvcLogEvents.cs index fe2c248c9..40e6a399a 100644 --- a/src/NATS.Client.Services/NatsSvcLogEvents.cs +++ b/src/NATS.Client.Services/NatsSvcLogEvents.cs @@ -5,4 +5,5 @@ namespace NATS.Client.Services; public static class NatsSvcLogEvents { public static readonly EventId Endpoint = new(5001, nameof(Endpoint)); + public static readonly EventId Listener = new(5002, nameof(Listener)); } diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index e785db65a..417ad5348 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -198,7 +198,7 @@ internal async ValueTask StartAsync() foreach (var subject in new[] { $"$SRV.{type}", $"$SRV.{type}.{name}", $"$SRV.{type}.{name}.{_id}" }) { // for discovery subjects do not use a queue group - var svcListener = new SvcListener(_nats, _channel, svcType, subject, default, _cts.Token); + var svcListener = new SvcListener(_logger, _nats, _channel, svcType, subject, default, _cts.Token); await svcListener.StartAsync(); _svcListeners.Add(svcListener); } diff --git a/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs b/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs index 68d478ef9..022ecba5e 100644 --- a/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs +++ b/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs @@ -1,3 +1,6 @@ +using System.Text; +using System.Text.Json; +using System.Threading.Channels; using NATS.Client.Core.Tests; using NATS.Client.Serializers.Json; using NATS.Client.Services.Internal; @@ -39,4 +42,70 @@ public async Task Service_info_and_stat_request_serialization() Assert.Equal("s1", stats[0].Name); Assert.Equal("1.0.0", stats[0].Version); } + + [Fact] + public async Task Service_message_serialization() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(new NatsOpts { SerializerRegistry = NatsJsonSerializerRegistry.Default }); + + var svc = new NatsSvcContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + var sync = Channel.CreateUnbounded(); + var wait1 = new WaitSignal(); + var wait2 = new WaitSignal(); + + await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); + await s1.AddEndpointAsync( + name: "e1", + handler: async m => + { + if (m.Exception is NatsDeserializeException de) + { + await m.ReplyAsync(1, cancellationToken: cancellationToken); + wait1.Pulse(de); + return; + } + + if (m.Data is { Name: "sync" }) + { + await m.ReplyAsync(0, cancellationToken: cancellationToken); + sync.Writer.TryWrite(1); + return; + } + + await m.ReplyAsync(0, cancellationToken: cancellationToken); + wait2.Pulse(m.Data!); + }, + cancellationToken: cancellationToken); + + await Retry.Until( + "synced", + () => sync.Reader.TryRead(out _), + async () => await nats.RequestAsync("e1", new TestData("sync"), cancellationToken: cancellationToken)); + + var brokenJson = "{\"Name\": broken"; + var r1 = await nats.RequestAsync("e1", brokenJson, requestSerializer: NatsUtf8PrimitivesSerializer.Default, cancellationToken: cancellationToken); + Assert.Equal(1, r1.Data); + var de = await wait1; + Assert.Equal(brokenJson, Encoding.ASCII.GetString(de.RawData)); + + if (de.InnerException is JsonException je) + { + Assert.Contains("'b' is an invalid start of a value", je.Message); + } + else + { + Assert.Fail("Expected JsonException"); + } + + var r2 = await nats.RequestAsync("e1", new TestData("abc"), cancellationToken: cancellationToken); + Assert.Equal(0, r2.Data); + var testData = await wait2; + } + + private record TestData(string Name); }