Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
using System.Collections.Concurrent;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.ErrorHandling;
using Wolverine.Runtime.Handlers;
using Xunit;
using Xunit.Abstractions;

namespace Wolverine.RabbitMQ.Tests.Bugs;

public class Bug_2078_pause_then_requeue : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private IHost _host;
private readonly string _queueName;

public Bug_2078_pause_then_requeue(ITestOutputHelper output)
{
_output = output;
_queueName = RabbitTesting.NextQueueName();
}

public async Task InitializeAsync()
{
PauseThenRequeueHandler.Reset();

_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup().DisableDeadLetterQueueing();

opts.PublishMessage<PauseThenRequeueMessage>()
.ToRabbitQueue(_queueName);

opts.ListenToRabbitQueue(_queueName)
.PreFetchCount(1);

opts.LocalRoutingConventionDisabled = true;
}).StartAsync();
}

public async Task DisposeAsync()
{
if (_host != null)
{
_host.TeardownResources();
await _host.StopAsync();
_host.Dispose();
}
}

[Fact]
public async Task pause_then_requeue_should_eventually_reprocess_message()
{
// Reproduce GH-2078: with PreFetchCount(1) and PauseThenRequeue,
// the message should be reprocessed after the pause period.
// The handler succeeds on attempt 2.
var bus = _host.Services.GetRequiredService<IMessageBus>();
await bus.PublishAsync(new PauseThenRequeueMessage());

// With PauseThenRequeue(3.Seconds()), the message should:
// 1. Fail on attempt 1
// 2. Be requeued and listener paused for 3 seconds
// 3. Succeed on attempt 2 after the pause
// Total time: ~3-5 seconds
// Without the fix, the message gets stuck with PreFetchCount(1) and never reprocesses.
var success = await Poll(30.Seconds(), () => PauseThenRequeueHandler.Succeeded);

var attempts = PauseThenRequeueHandler.Attempts.OrderBy(x => x).ToArray();

_output.WriteLine($"Total handler invocations: {attempts.Length}");
for (var i = 0; i < attempts.Length; i++)
{
_output.WriteLine($" Attempt {i + 1}: {attempts[i]:O}");
}

if (attempts.Length >= 2)
{
for (var i = 1; i < attempts.Length; i++)
{
var gap = attempts[i] - attempts[i - 1];
_output.WriteLine($" Gap between attempt {i} and {i + 1}: {gap.TotalSeconds:F1}s");
}
}

success.ShouldBeTrue("Message was never successfully reprocessed after PauseThenRequeue. " +
"With PreFetchCount(1), the original un-ACKed message blocks the listener.");

attempts.Length.ShouldBeGreaterThanOrEqualTo(2,
"Expected at least 2 handler invocations (fail then succeed)");

// Verify there is a meaningful gap (~3s) between attempts from the pause
var firstGap = attempts[1] - attempts[0];
_output.WriteLine($"\n First gap: {firstGap.TotalSeconds:F1}s (expected ~3s)");
firstGap.TotalSeconds.ShouldBeGreaterThan(2.0,
$"Expected at least 2 seconds between attempts (configured 3s pause), but gap was {firstGap.TotalSeconds:F1}s");
}

private static async Task<bool> Poll(TimeSpan timeout, Func<bool> condition)
{
var cts = new CancellationTokenSource(timeout);
while (!cts.IsCancellationRequested)
{
if (condition()) return true;
await Task.Delay(250.Milliseconds());
}

return condition();
}
}

public record PauseThenRequeueMessage;

public class PauseThenRequeueException : Exception
{
public PauseThenRequeueException() : base("Rate limit exceeded") { }
}

public class PauseThenRequeueHandler
{
public static readonly ConcurrentBag<DateTimeOffset> Attempts = new();
public static volatile bool Succeeded;
private static int _attemptCount;

public static void Reset()
{
Attempts.Clear();
Succeeded = false;
_attemptCount = 0;
}

public static void Configure(HandlerChain chain)
{
chain.OnException<PauseThenRequeueException>()
.PauseThenRequeue(3.Seconds());
}

public static void Handle(PauseThenRequeueMessage message)
{
Attempts.Add(DateTimeOffset.UtcNow);
var attempt = Interlocked.Increment(ref _attemptCount);

if (attempt == 1)
{
throw new PauseThenRequeueException();
}

// Succeed on attempt 2+
Succeeded = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ internal async Task CompleteAsync()
Acknowledged = true;
}

internal ValueTask DeferAsync()
internal async ValueTask DeferAsync()
{
Acknowledged = true;
return RabbitMqListener.RequeueAsync(this);
// ACK the original delivery to release the prefetch slot,
// then send a new copy to the queue for later processing.
// Without ACKing first, the original message stays "in-flight" in RabbitMQ
// and consumes a prefetch slot (blocking PreFetchCount(1) entirely).
if (!Acknowledged)
{
await RabbitMqListener.CompleteAsync(DeliveryTag);
Acknowledged = true;
}

await RabbitMqListener.RequeueAsync(this);
}
}
43 changes: 40 additions & 3 deletions src/Wolverine/ErrorHandling/RequeueContinuation.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Transports;

namespace Wolverine.ErrorHandling;

Expand All @@ -21,13 +23,48 @@ internal RequeueContinuation(TimeSpan delay)
public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now,
Activity? activity)
{
activity?.AddEvent(new ActivityEvent(WolverineTracing.EnvelopeRequeued));

if (Delay != null)
{
await Task.Delay(Delay.Value).ConfigureAwait(false);
// First, defer/requeue the message back to the transport
await lifecycle.DeferAsync();

// Schedule the listener pause on a background task to avoid deadlocking
// the BufferedReceiver's DrainAsync (which waits for in-flight messages,
// including this one, to complete).
var envelope = lifecycle.Envelope!;
_ = Task.Run(async () =>
{
try
{
var agent = findListenerCircuit(envelope, runtime);
if (agent != null)
{
await agent.PauseAsync(Delay.Value);
}
}
catch (Exception e)
{
runtime.Logger.LogError(e, "Error pausing listener for PauseThenRequeue");
}
});
}
else
{
await lifecycle.DeferAsync();
}
}

activity?.AddEvent(new ActivityEvent(WolverineTracing.EnvelopeRequeued));
await lifecycle.DeferAsync();
private static IListenerCircuit? findListenerCircuit(Envelope envelope, IWolverineRuntime runtime)
{
var destination = envelope.Destination;
if (destination?.Scheme == "local")
{
return runtime.Endpoints.AgentForLocalQueue(destination) as IListenerCircuit;
}

return runtime.Endpoints.FindListeningAgent(envelope.Listener!.Address);
}

public string Description => "Defer or Re-queue the message for later processing";
Expand Down
Loading