Skip to content

Commit

Permalink
Fix JS API deserializer (#709)
Browse files Browse the repository at this point in the history
* Fix JS API deserializer

* Format

* Fix JetStream API serialization with typed results

Replace JsonDocument-based responses with strongly-typed `NatsJSApiResult<T>`
for improved type safety and error handling. Added new deserialization logic
and tests to cover valid responses, errors, and edge cases like empty buffers.

* Format
  • Loading branch information
mtmk authored Jan 13, 2025
1 parent 2d3a347 commit 6bf76e2
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 27 deletions.
59 changes: 59 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Runtime.CompilerServices;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Internal;

internal readonly struct NatsJSApiResult<T>
{
private readonly T? _value;
private readonly ApiError? _error;
private readonly Exception? _exception;

public NatsJSApiResult(T value)
{
_value = value;
_error = null;
_exception = null;
}

public NatsJSApiResult(ApiError error)
{
_value = default;
_error = error;
_exception = null;
}

public NatsJSApiResult(Exception exception)
{
_value = default;
_error = null;
_exception = exception;
}

public T Value => _value ?? ThrowValueIsNotSetException();

public ApiError Error => _error ?? ThrowErrorIsNotSetException();

public Exception Exception => _exception ?? ThrowExceptionIsNotSetException();

public bool Success => _error == null && _exception == null;

public bool HasError => _error != null;

public bool HasException => _exception != null;

public static implicit operator NatsJSApiResult<T>(T value) => new(value);

public static implicit operator NatsJSApiResult<T>(ApiError error) => new(error);

public static implicit operator NatsJSApiResult<T>(Exception exception) => new(exception);

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static ApiError ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");

private static Exception ThrowExceptionIsNotSetException() => throw CreateInvalidOperationException("Result exception is not set");

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message);
}
35 changes: 32 additions & 3 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,38 @@

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize<JsonDocument>
internal sealed class NatsJSJsonDocumentSerializer<T> : INatsDeserialize<NatsJSApiResult<T>>
{
public static readonly NatsJSJsonDocumentSerializer Default = new();
public static readonly NatsJSJsonDocumentSerializer<T> Default = new();

public JsonDocument? Deserialize(in ReadOnlySequence<byte> buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer);
public NatsJSApiResult<T> Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return new NatsJSException("Buffer is empty");
}

using var jsonDocument = JsonDocument.Parse(buffer);

if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return error;
}

var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(T));
if (jsonTypeInfo == null)
{
return new NatsJSException($"Unknown response type {typeof(T)}");
}

var result = (T?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (result == null)
{
return new NatsJSException("Null result");
}

return result;
}
}
33 changes: 9 additions & 24 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,13 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
// Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await Connection.CreateRequestSubAsync<TRequest, JsonDocument>(
await using var sub = await Connection.CreateRequestSubAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
headers: default,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSJsonDocumentSerializer.Default,
replySerializer: NatsJSJsonDocumentSerializer<TResponse>.Default,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand All @@ -326,37 +326,22 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
return new NatsNoRespondersException();
}

if (msg.Data == null)
{
return new NatsJSException("No response data received");
}

// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
// API deserialize to the final type from the document.
using var jsonDocument = msg.Data;

if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
if (msg.Error is { } messageError)
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return new NatsJSResponse<TResponse>(default, error);
return messageError;
}

var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse));
if (jsonTypeInfo == null)
if (msg.Data.HasException)
{
return new NatsJSException($"Unknown response type {typeof(TResponse)}");
return msg.Data.Exception;
}

var response = (TResponse?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (msg.Error is { } messageError)
if (msg.Data.HasError)
{
return messageError;
return new NatsJSResponse<TResponse>(null, msg.Data.Error);
}

return new NatsJSResponse<TResponse>(response, default);
return new NatsJSResponse<TResponse>(msg.Data.Value, null);
}

if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
Expand Down
128 changes: 128 additions & 0 deletions tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
using System.Buffers;
using System.Text;
using NATS.Client.Core2.Tests;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;
using JsonSerializer = System.Text.Json.JsonSerializer;

namespace NATS.Client.JetStream.Tests;

[Collection("nats-server")]
public class JetStreamApiSerializerTest
{
private readonly ITestOutputHelper _output;
private readonly NatsServerFixture _server;

public JetStreamApiSerializerTest(ITestOutputHelper output, NatsServerFixture server)
{
_output = output;
_server = server;
}

[Fact]
public async Task Should_respect_buffers_lifecycle()
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
var apiSubject = $"{prefix}.js.fake.api";
var dataSubject = $"{prefix}.data";

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var ctsDone = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);

List<Task> tasks = new();

// Keep reader buffers busy with lots of data which should not be
// kept around and used by the JsonDocument deserializer.
// Data reader
tasks.Add(Task.Run(
async () =>
{
await foreach (var unused in nats.SubscribeAsync<string>(dataSubject, cancellationToken: ctsDone.Token))
{
}
},
cts.Token));

// Data writer
tasks.Add(Task.Run(
async () =>
{
var data = new string('x', 1024);
while (ctsDone.IsCancellationRequested == false)
{
await nats.PublishAsync(dataSubject, data, cancellationToken: ctsDone.Token);
}
},
cts.Token));

// Fake JS API responder
tasks.Add(Task.Run(
async () =>
{
var json = JsonSerializer.Serialize(new AccountInfoResponse { Consumers = 1234 });
await foreach (var msg in nats.SubscribeAsync<object>(apiSubject, cancellationToken: ctsDone.Token))
{
await msg.ReplyAsync(json, cancellationToken: cts.Token);
}
},
cts.Token));

// Fake JS API requester
tasks.Add(Task.Run(
async () =>
{
for (var i = 0; i < 100; i++)
{
if (ctsDone.IsCancellationRequested)
return;

try
{
var result = await js.TryJSRequestAsync<object, AccountInfoResponse>(apiSubject, null, ctsDone.Token);
}
catch
{
ctsDone.Cancel();
throw;
}
}

ctsDone.Cancel();
},
cts.Token));

try
{
await Task.WhenAll(tasks);
}
catch (TaskCanceledException)
{
}
}

[Fact]
public void Deserialize_value()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("""{"memory":1}""")));
result.Value.Memory.Should().Be(1);
}

[Fact]
public void Deserialize_empty_buffer()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(ReadOnlySequence<byte>.Empty);
result.Exception.Message.Should().Be("Buffer is empty");
}

[Fact]
public void Deserialize_error()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("""{"error":{"code":2}}""")));
result.Error.Code.Should().Be(2);
}
}

0 comments on commit 6bf76e2

Please sign in to comment.