Skip to content

Commit

Permalink
Revert to old WaitForConfirmsOrDieAsync behavior (reverts part of #999)…
Browse files Browse the repository at this point in the history
…. Fix test
  • Loading branch information
lukebakken committed Oct 29, 2023
1 parent a262b61 commit f85296e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
9 changes: 6 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,13 +1652,16 @@ await CloseAsync(
new IOException("nack received")),
false).ConfigureAwait(false);
}
catch (TaskCanceledException exception)
catch (TaskCanceledException)
{
const string msg = "timed out waiting for acks";
var ex = new IOException(msg);
await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.ReplySuccess,
"Timed out waiting for acks",
exception),
msg,
ex),
false).ConfigureAwait(false);
throw ex;
}
}

Expand Down
31 changes: 16 additions & 15 deletions projects/Test/Integration/TestPublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ namespace Test.Integration
{
public class TestPublisherConfirms : IntegrationFixture
{
private const string QueueName = "Test.Integration.TestPublisherConfirms";
private readonly byte[] _body = new byte[4096];

public TestPublisherConfirms(ITestOutputHelper output) : base(output)
Expand Down Expand Up @@ -79,10 +78,10 @@ public Task TestWaitForConfirmsWithTimeout()
}

[Fact]
public Task TestWaitForConfirmsWithTimeoutAsync_MightThrowTaskCanceledException()
public async Task TestWaitForConfirmsWithTimeoutAsync_MightThrowTaskCanceledException()
{
bool waitResult = false;
bool sawTaskCanceled = false;
bool sawException = false;

Task t = TestWaitForConfirmsAsync(10000, async (ch) =>
{
Expand All @@ -92,19 +91,19 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MightThrowTaskCanceledException(
{
waitResult = await ch.WaitForConfirmsAsync(cts.Token);
}
catch (TaskCanceledException)
catch
{
sawTaskCanceled = true;
sawException = true;
}
}
});

if (waitResult == false && sawTaskCanceled == false)
await t;

if (waitResult == false && sawException == false)
{
Assert.Fail("test failed, both waitResult and sawTaskCanceled are still false");
Assert.Fail("test failed, both waitResult and sawException are still false");
}

return t;
}

[Fact]
Expand All @@ -128,10 +127,11 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout
[Fact]
public async Task TestWaitForConfirmsWithEventsAsync()
{
string queueName = string.Format("{0}:{1}", _testDisplayName, Guid.NewGuid());
using (IChannel ch = _conn.CreateChannel())
{
await ch.ConfirmSelectAsync();
await ch.QueueDeclareAsync(queue: QueueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null);
await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null);

int n = 200;
// number of event handler invocations
Expand All @@ -146,7 +146,7 @@ public async Task TestWaitForConfirmsWithEventsAsync()
{
for (int i = 0; i < n; i++)
{
await ch.BasicPublishAsync("", QueueName, _encoding.GetBytes("msg"));
await ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg"));
}

await ch.WaitForConfirmsAsync();
Expand All @@ -159,23 +159,24 @@ public async Task TestWaitForConfirmsWithEventsAsync()
}
finally
{
await ch.QueueDeleteAsync(queue: QueueName, ifUnused: false, ifEmpty: false);
await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false);
}
}
}

private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func<IChannel, Task> fn)
{
string queueName = string.Format("{0}:{1}", _testDisplayName, Guid.NewGuid());
using (IChannel ch = _conn.CreateChannel())
{
var props = new BasicProperties { Persistent = true };

await ch.ConfirmSelectAsync();
await ch.QueueDeclareAsync(queue: QueueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null);
await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null);

for (int i = 0; i < numberOfMessagesToPublish; i++)
{
await ch.BasicPublishAsync(exchange: "", routingKey: QueueName, body: _body, mandatory: true, basicProperties: props);
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: _body, mandatory: true, basicProperties: props);
}

try
Expand All @@ -184,7 +185,7 @@ private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func<
}
finally
{
await ch.QueueDeleteAsync(queue: QueueName, ifUnused: false, ifEmpty: false);
await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false);
}
}
}
Expand Down

0 comments on commit f85296e

Please sign in to comment.