From 7fc7f438166fdfec648dac01a97f770bed43b14b Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 18 Dec 2024 10:38:05 -0600 Subject: [PATCH] Improved the conversation id tracking for request/response over external message handling. Closes GH-1176 --- .../CoreTests/Acceptance/remote_invocation.cs | 12 ++-- .../Wolverine.RabbitMQ.Tests/end_to_end.cs | 72 +++++++++++++++++++ src/Wolverine/Runtime/MessageContext.cs | 6 +- src/Wolverine/Runtime/Routing/MessageRoute.cs | 4 ++ 4 files changed, 85 insertions(+), 9 deletions(-) diff --git a/src/Testing/CoreTests/Acceptance/remote_invocation.cs b/src/Testing/CoreTests/Acceptance/remote_invocation.cs index a2bd262b9..91d7f5c69 100644 --- a/src/Testing/CoreTests/Acceptance/remote_invocation.cs +++ b/src/Testing/CoreTests/Acceptance/remote_invocation.cs @@ -291,8 +291,7 @@ public async Task sad_path_send_and_wait_for_acknowledgement_with_auto_routing() [Fact] public async Task timeout_on_send_and_wait_with_auto_routing() { - using var nested = _sender.Services.CreateScope(); - var publisher = nested.ServiceProvider.GetRequiredService(); + var publisher = _sender.MessageBus(); var ex = await Should.ThrowAsync(async () => { @@ -305,8 +304,7 @@ public async Task timeout_on_send_and_wait_with_auto_routing() [Fact] public async Task sad_path_request_and_reply_with_no_handler() { - using var nested = _sender.Services.CreateScope(); - var publisher = nested.ServiceProvider.GetRequiredService(); + var publisher = _sender.MessageBus(); var ex = await Should.ThrowAsync(async () => { @@ -320,8 +318,7 @@ public async Task sad_path_request_and_reply_with_no_handler() [Fact] public async Task sad_path_send_and_wait_with_no_handler() { - using var nested = _sender.Services.CreateScope(); - var publisher = nested.ServiceProvider.GetRequiredService(); + var publisher = _sender.MessageBus(); var ex = await Should.ThrowAsync(async () => { @@ -335,8 +332,7 @@ public async Task sad_path_send_and_wait_with_no_handler() [Fact] public async Task sad_path_send_and_wait_with_no_subscription() { - using var nested = _sender.Services.CreateScope(); - var publisher = nested.ServiceProvider.GetRequiredService(); + var publisher = _sender.MessageBus(); await Should.ThrowAsync(() => publisher.InvokeAsync(new RequestWithNoHandler())); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs index 7dcd9de5a..525ac385f 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs @@ -673,6 +673,47 @@ public async Task use_direct_exchange() receiver.Get().Name.ShouldBe("Purple"); } + + + [Fact] + public async Task request_reply_from_within_handler() + { + var queueName = RabbitTesting.NextQueueName(); + using var publisher = WolverineHost.For(opts => + { + opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishAllMessages() + .ToRabbitQueue(queueName); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + + opts.DisableConventionalDiscovery() + .IncludeType(typeof(RequestColorsHandler)) + .IncludeType(typeof(ColorResponseHandler)); + }); + + + using var receiver = WolverineHost.For(opts => + { + opts.DisableConventionalDiscovery() + .IncludeType(typeof(ColorRequestHandler)); + + opts.UseRabbitMq().AutoProvision().DisableDeadLetterQueueing(); + + opts.ListenToRabbitQueue(queueName); + }); + + await receiver.ResetResourceState(); + + await publisher + .TrackActivity() + .AlsoTrack(receiver) + .Timeout(30.Seconds()) // this one can be slow when it's in a group of tests + .InvokeMessageAndWaitAsync(new RequestColors(["red", "green", "blue", "orange"])); + //.InvokeMessageAndWaitAsync(new RequestColors(["red"])); + } + } public class SpecialTopicGuy @@ -776,4 +817,35 @@ public static ValueTask Handle( // this use case return context.RespondToSenderAsync(response); } +} + +public record ColorRequest(string Color); +public record ColorResponse(string Color); + +public static class ColorRequestHandler +{ + public static async Task Handle(ColorRequest request) + { + await Task.Delay(Random.Shared.Next(0, 500).Milliseconds()); + return new ColorResponse(request.Color); + } +} + +public static class ColorResponseHandler +{ + public static void Handle(ColorResponse response) => Debug.WriteLine("Got color response for " + response.Color); +} + +public record RequestColors(string[] Colors); + +public static class RequestColorsHandler +{ + public static async Task HandleAsync(RequestColors message, IMessageBus bus) + { + for (int i = 0; i < message.Colors.Length; i++) + { + var response = await bus.InvokeAsync(new ColorRequest(message.Colors[i]), timeout:30.Seconds()); + response.Color.ShouldBe(message.Colors[i]); + } + } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index caf498d07..ad00fe642 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -462,7 +462,11 @@ internal override void TrackEnvelopeCorrelation(Envelope outbound, Activity? act { base.TrackEnvelopeCorrelation(outbound, activity); outbound.SagaId = _sagaId?.ToString() ?? Envelope?.SagaId ?? outbound.SagaId; - outbound.ConversationId = ConversationId; + + if (ConversationId != Guid.Empty) + { + outbound.ConversationId = ConversationId; + } if (Envelope != null) { diff --git a/src/Wolverine/Runtime/Routing/MessageRoute.cs b/src/Wolverine/Runtime/Routing/MessageRoute.cs index 4cac1f35a..92d557800 100644 --- a/src/Wolverine/Runtime/Routing/MessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/MessageRoute.cs @@ -143,6 +143,10 @@ public async Task InvokeAsync(object message, MessageBus bus, envelope.Sender = Sender; bus.TrackEnvelopeCorrelation(envelope, Activity.Current); + + // The request/reply envelope *must* use the envelope id for the conversation id + // for proper tracking. See https://github.com/JasperFx/wolverine/issues/1176 + envelope.ConversationId = envelope.Id; var waiter = _replyTracker.RegisterListener(envelope, cancellation, timeout.Value);