From 9a06076507486c1b0511cdcfe11b30e5ad566434 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 18 Jul 2024 17:59:13 +0100 Subject: [PATCH] Add logging enhancements and improve error handling The commit refactors the error handling and logging within the NatsSvcServer and NatsSvcEndPoint classes. Updates have been made to the SvcListener, enhancing message logging and ensuring clean unsubscription on object disposal. The changes also facilitate better debugging and analysis by adding more logging statements for subscription handling within the SubscriptionManager class. Additional updates are made to handle possible errors in the handler loop within the NatsSvcEndPoint class. --- .../Internal/SubscriptionManager.cs | 29 ++++-- .../Internal/SvcListener.cs | 48 ++++++++-- src/NATS.Client.Services/NatsSvcEndPoint.cs | 92 +++++++++++-------- src/NATS.Client.Services/NatsSvcLogEvents.cs | 1 + src/NATS.Client.Services/NatsSvcServer.cs | 2 +- .../ServicesSerializationTest.cs | 69 ++++++++++++++ 6 files changed, 188 insertions(+), 53 deletions(-) diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index 872423324..7e285d1a5 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -18,6 +18,7 @@ internal sealed record SubscriptionMetadata(int Sid); internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable { private readonly ILogger _logger; + private readonly bool _debug; private readonly object _gate = new(); private readonly NatsConnection _connection; private readonly string _inboxPrefix; @@ -37,6 +38,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) _connection = connection; _inboxPrefix = inboxPrefix; _logger = _connection.Opts.LoggerFactory.CreateLogger(); + _debug = _logger.IsEnabled(LogLevel.Debug); _cts = new CancellationTokenSource(); _cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval; _timer = Task.Run(CleanupAsync); @@ -148,19 +150,26 @@ public async ValueTask DisposeAsync() public ValueTask RemoveAsync(NatsSubBase sub) { - if (!_bySub.TryGetValue(sub, out var subMetadata)) - { - // this can happen when a call to SubscribeAsync is canceled or timed out before subscribing - // in that case, return as there is nothing to unsubscribe - return default; - } - + SubscriptionMetadata? subMetadata; lock (_gate) { + if (!_bySub.TryGetValue(sub, out subMetadata)) + { + // this can happen when a call to SubscribeAsync is canceled or timed out before subscribing + // in that case, return as there is nothing to unsubscribe + _logger.LogInformation(NatsLogEvents.Subscription, "No need to remove subscription {Subject}", sub.Subject); + return default; + } + _bySub.Remove(sub); _bySid.TryRemove(subMetadata.Sid, out _); } + if (_debug) + { + _logger.LogDebug(NatsLogEvents.Subscription, "Removing subscription {Subject}/{Sid}", sub.Subject, subMetadata.Sid); + } + return _connection.UnsubscribeAsync(subMetadata.Sid); } @@ -234,6 +243,12 @@ await SubscribeInternalAsync( private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { var sid = GetNextSid(); + + if (_debug) + { + _logger.LogDebug(NatsLogEvents.Subscription, "New subscription {Subject}/{Sid}", sub.Subject, sid); + } + lock (_gate) { _bySid[sid] = new SidMetadata(Subject: subject, WeakReference: new WeakReference(sub)); diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 88fdd234d..6b3edb220 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -1,20 +1,24 @@ using System.Threading.Channels; +using Microsoft.Extensions.Logging; using NATS.Client.Core; namespace NATS.Client.Services.Internal; internal class SvcListener : IAsyncDisposable { + private readonly ILogger _logger; private readonly NatsConnection _nats; private readonly Channel _channel; private readonly SvcMsgType _type; private readonly string _subject; private readonly string? _queueGroup; private readonly CancellationTokenSource _cts; + private INatsSub>? _sub; private Task? _readLoop; - public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) + public SvcListener(ILogger logger, NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) { + _logger = logger; _nats = nats; _channel = channel; _type = type; @@ -25,27 +29,59 @@ public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type public async ValueTask StartAsync() { - var sub = await _nats.SubscribeCoreAsync(_subject, _queueGroup, serializer: NatsRawSerializer>.Default, cancellationToken: _cts.Token); + _sub = await _nats.SubscribeCoreAsync(_subject, _queueGroup, serializer: NatsRawSerializer>.Default, cancellationToken: _cts.Token).ConfigureAwait(false); _readLoop = Task.Run(async () => { - await using (sub) + try { - await foreach (var msg in sub.Msgs.ReadAllAsync()) + await foreach (var msg in _sub.Msgs.ReadAllAsync(_cts.Token).ConfigureAwait(false)) { - await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cts.Token).ConfigureAwait(false); + try + { + await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogWarning(NatsSvcLogEvents.Listener, e, "Error writing message to {Subject} listener channel", _subject); + } } } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogError(NatsSvcLogEvents.Listener, e, "Error in {Subject} subscription", _subject); + } }); } public async ValueTask DisposeAsync() { _cts.Cancel(); + + if (_sub != null) + { + try + { + await _sub.DisposeAsync().ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + } + catch (OperationCanceledException) + { + } + } + if (_readLoop != null) { try { - await _readLoop; + await _readLoop.ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index b13a722ff..cdecd7578 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -191,19 +191,23 @@ protected override ValueTask ReceiveInternalAsync( try { msg = NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer); - exception = null; + exception = msg.Error; } catch (Exception e) { _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error building message", Name); exception = e; - // Most likely a serialization error. // Make sure we have a valid message // so handler can reply with an error. msg = new NatsMsg(subject, replyTo, subject.Length + (replyTo?.Length ?? 0), default, default, _nats); } + if (exception is not null) + { + _logger.LogWarning(NatsSvcLogEvents.Endpoint, exception, "Endpoint {Name} error receiving message", Name); + } + return _channel.Writer.WriteAsync(new NatsSvcMsg(msg, this, exception), _cancellationToken); } @@ -212,57 +216,67 @@ protected override ValueTask ReceiveInternalAsync( private async Task HandlerLoop() { var stopwatch = new Stopwatch(); - await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) + try { - Interlocked.Increment(ref _requests); - stopwatch.Restart(); - try - { - await _handler(svcMsg).ConfigureAwait(false); - } - catch (Exception e) + await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) { - int code; - string message; - string body; - if (e is NatsSvcEndpointException epe) - { - code = epe.Code; - message = epe.Message; - body = epe.Body; - } - else + Interlocked.Increment(ref _requests); + stopwatch.Restart(); + try { - // Do not expose exceptions unless explicitly - // thrown as NatsSvcEndpointException - code = 999; - message = "Handler error"; - body = string.Empty; - - // Only log unknown exceptions - _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error processing message", Name); + await _handler(svcMsg).ConfigureAwait(false); } - - try + catch (Exception e) { - if (string.IsNullOrWhiteSpace(body)) + int code; + string message; + string body; + if (e is NatsSvcEndpointException epe) { - await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken); + code = epe.Code; + message = epe.Message; + body = epe.Body; } else { - await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken); + // Do not expose exceptions unless explicitly + // thrown as NatsSvcEndpointException + code = 999; + message = "Handler error"; + body = string.Empty; + + // Only log unknown exceptions + _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error processing message", Name); + } + + try + { + if (string.IsNullOrWhiteSpace(body)) + { + await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken); + } + else + { + await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken); + } + } + catch (Exception e1) + { + _logger.LogError(NatsSvcLogEvents.Endpoint, e1, "Endpoint {Name} error responding", Name); } } - catch (Exception e1) + finally { - _logger.LogError(NatsSvcLogEvents.Endpoint, e1, "Endpoint {Name} error responding", Name); + Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed)); } } - finally - { - Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed)); - } + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error in handler loop", Name); } } diff --git a/src/NATS.Client.Services/NatsSvcLogEvents.cs b/src/NATS.Client.Services/NatsSvcLogEvents.cs index fe2c248c9..40e6a399a 100644 --- a/src/NATS.Client.Services/NatsSvcLogEvents.cs +++ b/src/NATS.Client.Services/NatsSvcLogEvents.cs @@ -5,4 +5,5 @@ namespace NATS.Client.Services; public static class NatsSvcLogEvents { public static readonly EventId Endpoint = new(5001, nameof(Endpoint)); + public static readonly EventId Listener = new(5002, nameof(Listener)); } diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index e785db65a..417ad5348 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -198,7 +198,7 @@ internal async ValueTask StartAsync() foreach (var subject in new[] { $"$SRV.{type}", $"$SRV.{type}.{name}", $"$SRV.{type}.{name}.{_id}" }) { // for discovery subjects do not use a queue group - var svcListener = new SvcListener(_nats, _channel, svcType, subject, default, _cts.Token); + var svcListener = new SvcListener(_logger, _nats, _channel, svcType, subject, default, _cts.Token); await svcListener.StartAsync(); _svcListeners.Add(svcListener); } diff --git a/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs b/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs index 68d478ef9..022ecba5e 100644 --- a/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs +++ b/tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs @@ -1,3 +1,6 @@ +using System.Text; +using System.Text.Json; +using System.Threading.Channels; using NATS.Client.Core.Tests; using NATS.Client.Serializers.Json; using NATS.Client.Services.Internal; @@ -39,4 +42,70 @@ public async Task Service_info_and_stat_request_serialization() Assert.Equal("s1", stats[0].Name); Assert.Equal("1.0.0", stats[0].Version); } + + [Fact] + public async Task Service_message_serialization() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(new NatsOpts { SerializerRegistry = NatsJsonSerializerRegistry.Default }); + + var svc = new NatsSvcContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + var sync = Channel.CreateUnbounded(); + var wait1 = new WaitSignal(); + var wait2 = new WaitSignal(); + + await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); + await s1.AddEndpointAsync( + name: "e1", + handler: async m => + { + if (m.Exception is NatsDeserializeException de) + { + await m.ReplyAsync(1, cancellationToken: cancellationToken); + wait1.Pulse(de); + return; + } + + if (m.Data is { Name: "sync" }) + { + await m.ReplyAsync(0, cancellationToken: cancellationToken); + sync.Writer.TryWrite(1); + return; + } + + await m.ReplyAsync(0, cancellationToken: cancellationToken); + wait2.Pulse(m.Data!); + }, + cancellationToken: cancellationToken); + + await Retry.Until( + "synced", + () => sync.Reader.TryRead(out _), + async () => await nats.RequestAsync("e1", new TestData("sync"), cancellationToken: cancellationToken)); + + var brokenJson = "{\"Name\": broken"; + var r1 = await nats.RequestAsync("e1", brokenJson, requestSerializer: NatsUtf8PrimitivesSerializer.Default, cancellationToken: cancellationToken); + Assert.Equal(1, r1.Data); + var de = await wait1; + Assert.Equal(brokenJson, Encoding.ASCII.GetString(de.RawData)); + + if (de.InnerException is JsonException je) + { + Assert.Contains("'b' is an invalid start of a value", je.Message); + } + else + { + Assert.Fail("Expected JsonException"); + } + + var r2 = await nats.RequestAsync("e1", new TestData("abc"), cancellationToken: cancellationToken); + Assert.Equal(0, r2.Data); + var testData = await wait2; + } + + private record TestData(string Name); }