Skip to content

Commit

Permalink
Add logging enhancements and improve error handling (#570)
Browse files Browse the repository at this point in the history
The commit refactors the error handling and logging within the NatsSvcServer and NatsSvcEndPoint classes. Updates have been made to the SvcListener, enhancing message logging and ensuring clean unsubscription on object disposal. The changes also facilitate better debugging and analysis by adding more logging statements for subscription handling within the SubscriptionManager class. Additional updates are made to handle possible errors in the handler loop within the NatsSvcEndPoint class.
  • Loading branch information
mtmk authored Jul 18, 2024
1 parent 3e2cb12 commit 509969a
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 53 deletions.
29 changes: 22 additions & 7 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal sealed record SubscriptionMetadata(int Sid);
internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable
{
private readonly ILogger<SubscriptionManager> _logger;
private readonly bool _debug;
private readonly object _gate = new();
private readonly NatsConnection _connection;
private readonly string _inboxPrefix;
Expand All @@ -37,6 +38,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
_connection = connection;
_inboxPrefix = inboxPrefix;
_logger = _connection.Opts.LoggerFactory.CreateLogger<SubscriptionManager>();
_debug = _logger.IsEnabled(LogLevel.Debug);
_cts = new CancellationTokenSource();
_cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval;
_timer = Task.Run(CleanupAsync);
Expand Down Expand Up @@ -148,19 +150,26 @@ public async ValueTask DisposeAsync()

public ValueTask RemoveAsync(NatsSubBase sub)
{
if (!_bySub.TryGetValue(sub, out var subMetadata))
{
// this can happen when a call to SubscribeAsync is canceled or timed out before subscribing
// in that case, return as there is nothing to unsubscribe
return default;
}

SubscriptionMetadata? subMetadata;
lock (_gate)
{
if (!_bySub.TryGetValue(sub, out subMetadata))
{
// this can happen when a call to SubscribeAsync is canceled or timed out before subscribing
// in that case, return as there is nothing to unsubscribe
_logger.LogInformation(NatsLogEvents.Subscription, "No need to remove subscription {Subject}", sub.Subject);
return default;
}

_bySub.Remove(sub);
_bySid.TryRemove(subMetadata.Sid, out _);
}

if (_debug)
{
_logger.LogDebug(NatsLogEvents.Subscription, "Removing subscription {Subject}/{Sid}", sub.Subject, subMetadata.Sid);
}

return _connection.UnsubscribeAsync(subMetadata.Sid);
}

Expand Down Expand Up @@ -234,6 +243,12 @@ await SubscribeInternalAsync(
private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
var sid = GetNextSid();

if (_debug)
{
_logger.LogDebug(NatsLogEvents.Subscription, "New subscription {Subject}/{Sid}", sub.Subject, sid);
}

lock (_gate)
{
_bySid[sid] = new SidMetadata(Subject: subject, WeakReference: new WeakReference<NatsSubBase>(sub));
Expand Down
48 changes: 42 additions & 6 deletions src/NATS.Client.Services/Internal/SvcListener.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

namespace NATS.Client.Services.Internal;

internal class SvcListener : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly NatsConnection _nats;
private readonly Channel<SvcMsg> _channel;
private readonly SvcMsgType _type;
private readonly string _subject;
private readonly string? _queueGroup;
private readonly CancellationTokenSource _cts;
private INatsSub<NatsMemoryOwner<byte>>? _sub;
private Task? _readLoop;

public SvcListener(NatsConnection nats, Channel<SvcMsg> channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken)
public SvcListener(ILogger logger, NatsConnection nats, Channel<SvcMsg> channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken)
{
_logger = logger;
_nats = nats;
_channel = channel;
_type = type;
Expand All @@ -25,27 +29,59 @@ public SvcListener(NatsConnection nats, Channel<SvcMsg> channel, SvcMsgType type

public async ValueTask StartAsync()
{
var sub = await _nats.SubscribeCoreAsync(_subject, _queueGroup, serializer: NatsRawSerializer<NatsMemoryOwner<byte>>.Default, cancellationToken: _cts.Token);
_sub = await _nats.SubscribeCoreAsync(_subject, _queueGroup, serializer: NatsRawSerializer<NatsMemoryOwner<byte>>.Default, cancellationToken: _cts.Token).ConfigureAwait(false);
_readLoop = Task.Run(async () =>
{
await using (sub)
try
{
await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in _sub.Msgs.ReadAllAsync(_cts.Token).ConfigureAwait(false))
{
await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cts.Token).ConfigureAwait(false);
try
{
await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
_logger.LogWarning(NatsSvcLogEvents.Listener, e, "Error writing message to {Subject} listener channel", _subject);
}
}
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
_logger.LogError(NatsSvcLogEvents.Listener, e, "Error in {Subject} subscription", _subject);
}
});
}

public async ValueTask DisposeAsync()
{
_cts.Cancel();

if (_sub != null)
{
try
{
await _sub.DisposeAsync().ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
}
catch (OperationCanceledException)
{
}
}

if (_readLoop != null)
{
try
{
await _readLoop;
await _readLoop.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Expand Down
92 changes: 53 additions & 39 deletions src/NATS.Client.Services/NatsSvcEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,23 @@ protected override ValueTask ReceiveInternalAsync(
try
{
msg = NatsMsg<T>.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer);
exception = null;
exception = msg.Error;
}
catch (Exception e)
{
_logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error building message", Name);
exception = e;

// Most likely a serialization error.
// Make sure we have a valid message
// so handler can reply with an error.
msg = new NatsMsg<T>(subject, replyTo, subject.Length + (replyTo?.Length ?? 0), default, default, _nats);
}

if (exception is not null)
{
_logger.LogWarning(NatsSvcLogEvents.Endpoint, exception, "Endpoint {Name} error receiving message", Name);
}

return _channel.Writer.WriteAsync(new NatsSvcMsg<T>(msg, this, exception), _cancellationToken);
}

Expand All @@ -212,57 +216,67 @@ protected override ValueTask ReceiveInternalAsync(
private async Task HandlerLoop()
{
var stopwatch = new Stopwatch();
await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
try
{
Interlocked.Increment(ref _requests);
stopwatch.Restart();
try
{
await _handler(svcMsg).ConfigureAwait(false);
}
catch (Exception e)
await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
{
int code;
string message;
string body;
if (e is NatsSvcEndpointException epe)
{
code = epe.Code;
message = epe.Message;
body = epe.Body;
}
else
Interlocked.Increment(ref _requests);
stopwatch.Restart();
try
{
// Do not expose exceptions unless explicitly
// thrown as NatsSvcEndpointException
code = 999;
message = "Handler error";
body = string.Empty;

// Only log unknown exceptions
_logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error processing message", Name);
await _handler(svcMsg).ConfigureAwait(false);
}

try
catch (Exception e)
{
if (string.IsNullOrWhiteSpace(body))
int code;
string message;
string body;
if (e is NatsSvcEndpointException epe)
{
await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken);
code = epe.Code;
message = epe.Message;
body = epe.Body;
}
else
{
await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken);
// Do not expose exceptions unless explicitly
// thrown as NatsSvcEndpointException
code = 999;
message = "Handler error";
body = string.Empty;

// Only log unknown exceptions
_logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error processing message", Name);
}

try
{
if (string.IsNullOrWhiteSpace(body))
{
await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken);
}
else
{
await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken);
}
}
catch (Exception e1)
{
_logger.LogError(NatsSvcLogEvents.Endpoint, e1, "Endpoint {Name} error responding", Name);
}
}
catch (Exception e1)
finally
{
_logger.LogError(NatsSvcLogEvents.Endpoint, e1, "Endpoint {Name} error responding", Name);
Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed));
}
}
finally
{
Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed));
}
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
_logger.LogError(NatsSvcLogEvents.Endpoint, e, "Endpoint {Name} error in handler loop", Name);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.Services/NatsSvcLogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ namespace NATS.Client.Services;
public static class NatsSvcLogEvents
{
public static readonly EventId Endpoint = new(5001, nameof(Endpoint));
public static readonly EventId Listener = new(5002, nameof(Listener));
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Services/NatsSvcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ internal async ValueTask StartAsync()
foreach (var subject in new[] { $"$SRV.{type}", $"$SRV.{type}.{name}", $"$SRV.{type}.{name}.{_id}" })
{
// for discovery subjects do not use a queue group
var svcListener = new SvcListener(_nats, _channel, svcType, subject, default, _cts.Token);
var svcListener = new SvcListener(_logger, _nats, _channel, svcType, subject, default, _cts.Token);
await svcListener.StartAsync();
_svcListeners.Add(svcListener);
}
Expand Down
69 changes: 69 additions & 0 deletions tests/NATS.Client.Services.Tests/ServicesSerializationTest.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using NATS.Client.Core.Tests;
using NATS.Client.Serializers.Json;
using NATS.Client.Services.Internal;
Expand Down Expand Up @@ -39,4 +42,70 @@ public async Task Service_info_and_stat_request_serialization()
Assert.Equal("s1", stats[0].Name);
Assert.Equal("1.0.0", stats[0].Version);
}

[Fact]
public async Task Service_message_serialization()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts { SerializerRegistry = NatsJsonSerializerRegistry.Default });

var svc = new NatsSvcContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var sync = Channel.CreateUnbounded<int>();
var wait1 = new WaitSignal<NatsDeserializeException>();
var wait2 = new WaitSignal<TestData>();

await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);
await s1.AddEndpointAsync<TestData>(
name: "e1",
handler: async m =>
{
if (m.Exception is NatsDeserializeException de)
{
await m.ReplyAsync(1, cancellationToken: cancellationToken);
wait1.Pulse(de);
return;
}

if (m.Data is { Name: "sync" })
{
await m.ReplyAsync(0, cancellationToken: cancellationToken);
sync.Writer.TryWrite(1);
return;
}

await m.ReplyAsync(0, cancellationToken: cancellationToken);
wait2.Pulse(m.Data!);
},
cancellationToken: cancellationToken);

await Retry.Until(
"synced",
() => sync.Reader.TryRead(out _),
async () => await nats.RequestAsync<TestData, int>("e1", new TestData("sync"), cancellationToken: cancellationToken));

var brokenJson = "{\"Name\": broken";
var r1 = await nats.RequestAsync<string, int>("e1", brokenJson, requestSerializer: NatsUtf8PrimitivesSerializer<string>.Default, cancellationToken: cancellationToken);
Assert.Equal(1, r1.Data);
var de = await wait1;
Assert.Equal(brokenJson, Encoding.ASCII.GetString(de.RawData));

if (de.InnerException is JsonException je)
{
Assert.Contains("'b' is an invalid start of a value", je.Message);
}
else
{
Assert.Fail("Expected JsonException");
}

var r2 = await nats.RequestAsync<TestData, int>("e1", new TestData("abc"), cancellationToken: cancellationToken);
Assert.Equal(0, r2.Data);
var testData = await wait2;
}

private record TestData(string Name);
}

0 comments on commit 509969a

Please sign in to comment.