Skip to content

Demonstrate handling of Channel.Close #1791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/Impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ await _session0.HandleFrameAsync(frame, cancellationToken)
// frames for non-zero channels (and any inbound
// commands on channel zero that aren't
// Connection.CloseOk) must be discarded.
if (_closeReason is null)
if (CloseReason is null)
{
// No close reason, not quiescing the
// connection. Handle the frame. (Of course, the
Expand Down
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/Impl/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public ISession Lookup(int number)
{
lock (_sessionMap)
{
/*
* Note: rabbitmq/rabbitmq-server#13337
* When investigating the above issue, a couple KeyNotFoundExceptions
* were thrown here during test shutdown. No reliable reproducer.
*/
return _sessionMap[number];
}
}
Expand Down
164 changes: 164 additions & 0 deletions projects/Test/Integration/GH/TestGitHubIssues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
Expand Down Expand Up @@ -165,5 +167,167 @@ await Assert.ThrowsAnyAsync<BrokerUnreachableException>(
async () => await _connFactory.CreateConnectionAsync());
Assert.IsAssignableFrom<PossibleAuthenticationFailureException>(ex.InnerException);
}

[Fact]
public async Task SendInvalidPublishMaybeClosesConnection_GH13387()
{
const int messageCount = 200;

_connFactory = new ConnectionFactory();
_conn = await _connFactory.CreateConnectionAsync();

var opts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
_channel = await _conn.CreateChannelAsync(opts);

await _channel.BasicQosAsync(0, 10, false);

string queueName = GenerateQueueName();
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
Assert.Equal(queueName, q.QueueName);

byte[] body = Encoding.ASCII.GetBytes("incoming message");
var publishTasks = new List<ValueTask>();
for (int i = 0; i < messageCount; i++)
{
ValueTask pt = _channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
body: body);
publishTasks.Add(pt);
if (i % 20 == 0)
{
foreach (ValueTask t in publishTasks)
{
await t;
}
publishTasks.Clear();
}
}

foreach (ValueTask t in publishTasks)
{
await t;
}
publishTasks.Clear();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

_conn.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("_conn.CallbackExceptionAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
return Task.CompletedTask;
};

_conn.ConnectionShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (args.Exception is not null)
{
if (IsVerbose)
{
_output.WriteLine("_conn.ConnectionShutdownAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
}
else
{
if (IsVerbose)
{
_output.WriteLine("_conn.ConnectionShutdownAsync");
}
tcs.TrySetResult(false);
}
return Task.CompletedTask;
};

_channel.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("_channel.CallbackExceptionAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
return Task.CompletedTask;
};

_channel.ChannelShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (args.Exception is not null)
{
if (IsVerbose)
{
_output.WriteLine("_channel.ChannelShutdownAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
}
else
{
if (IsVerbose)
{
_output.WriteLine("_channel.ChannelShutdownAsync");
}
tcs.TrySetResult(false);
}
return Task.CompletedTask;
};

var ackExceptions = new List<Exception>();
var publishExceptions = new List<Exception>();
var props = new BasicProperties { Expiration = "-1" };
int receivedCounter = 0;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) =>
{
var c = (AsyncEventingBasicConsumer)sender;
IChannel ch = c.Channel;
try
{
await ch.BasicAckAsync(args.DeliveryTag, false);
}
catch (Exception ex)
{
ackExceptions.Add(ex);
}

try
{
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
mandatory: true, basicProperties: props, body: body);
}
catch (Exception ex)
{
publishExceptions.Add(ex);
}

if (Interlocked.Increment(ref receivedCounter) >= messageCount)
{
tcs.SetResult(true);
}
};

consumer.ShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("consumer.ShutdownAsync");
}
return Task.CompletedTask;
};

await _channel.BasicConsumeAsync(queueName, false, consumer);

await tcs.Task;

if (IsVerbose)
{
_output.WriteLine("saw {0} ackExceptions", ackExceptions.Count);
_output.WriteLine("saw {0} publishExceptions", publishExceptions.Count);
}
}
}
}