Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding fully asynchronous versions of connect and publish. #1311

Merged
merged 7 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ csharp_style_inlined_variable_declaration = true:suggestion
csharp_style_throw_expression = true:suggestion
csharp_style_conditional_delegate_call = true:suggestion

# Async methods should have "Async" suffix
dotnet_naming_rule.async_methods_end_in_async.symbols = any_async_methods
dotnet_naming_rule.async_methods_end_in_async.style = end_in_async
dotnet_naming_rule.async_methods_end_in_async.severity = warning

dotnet_naming_symbols.any_async_methods.applicable_kinds = method
dotnet_naming_symbols.any_async_methods.applicable_accessibilities = *
dotnet_naming_symbols.any_async_methods.required_modifiers = async

dotnet_naming_style.end_in_async.required_suffix = Async
dotnet_naming_style.end_in_async.capitalization = pascal_case_style

# Other features
csharp_style_prefer_index_operator = false:none
csharp_style_prefer_range_operator = false:none
Expand Down
9 changes: 8 additions & 1 deletion projects/Benchmarks/Benchmarks.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PropertyGroup Condition="$([MSBuild]::IsOSPlatform('Windows'))">
<TargetFrameworks>net6.0;net472</TargetFrameworks>
</PropertyGroup>

<PropertyGroup Condition="!$([MSBuild]::IsOSPlatform('Windows'))">
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
Expand Down
20 changes: 20 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,26 @@ void BasicPublish<TProperties>(string exchange, string routingKey, in TPropertie
/// </remarks>
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
/// <summary>
/// Asynchronously publishes a message.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
/// <summary>
/// Asynchronously publishes a message.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
#nullable disable

/// <summary>
Expand Down
7 changes: 7 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -93,6 +94,12 @@ public static void BasicPublish(this IChannel channel, string exchange, string r

public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
#nullable disable

/// <summary>
Expand Down
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -109,6 +110,11 @@ public override void _Private_ConnectionOpen(string virtualHost)
ChannelSend(new ConnectionOpen(virtualHost));
}

public override ValueTask _Private_ConnectionOpenAsync(string virtualHost)
{
return ModelSendAsync(new ConnectionOpen(virtualHost));
}

public override void _Private_ConnectionSecureOk(byte[] response)
{
ChannelSend(new ConnectionSecureOk(response));
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);

public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
ThrowIfDisposed();
Expand Down
141 changes: 86 additions & 55 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ namespace RabbitMQ.Client.Impl
internal abstract class ChannelBase : IChannel, IRecoverable
{
///<summary>Only used to kick-start a connection open
///sequence. See <see cref="Connection.Open"/> </summary>
internal BlockingCell<ConnectionStartDetails> m_connectionStartCell;
///sequence. See <see cref="Connection.OpenAsync"/> </summary>
internal TaskCompletionSource<ConnectionStartDetails> m_connectionStartCell;

// AMQP only allows one RPC operation to be active at a time.
private readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

Expand Down Expand Up @@ -239,32 +241,18 @@ private async Task CloseAsync(ShutdownEventArgs reason, bool abort)
}
}

internal void ConnectionOpen(string virtualHost)
internal async ValueTask ConnectionOpenAsync(string virtualHost)
{
var k = new SimpleBlockingRpcContinuation();
lock (_rpcLock)
{
Enqueue(k);
try
{
_Private_ConnectionOpen(virtualHost);
}
catch (AlreadyClosedException)
{
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}
k.GetReply(HandshakeContinuationTimeout);
}
await _Private_ConnectionOpenAsync(virtualHost).TimeoutAfter(HandshakeContinuationTimeout);
}

internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
internal async ValueTask<ConnectionSecureOrTune> ConnectionSecureOkAsync(byte[] response)
{
var k = new ConnectionStartRpcContinuation();
lock (_rpcLock)
var k = new ConnectionSecureOrTuneContinuation();
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
Enqueue(k);
try
{
Enqueue(k);
try
{
_Private_ConnectionSecureOk(response);
Expand All @@ -275,31 +263,40 @@ internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
// which is a much more suitable exception before connection
// negotiation finishes
}
k.GetReply(HandshakeContinuationTimeout);

return await k;
}
finally
{
_rpcSemaphore.Release();
}
return k.m_result;
}

internal ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
internal async ValueTask<ConnectionSecureOrTune> ConnectionStartOkAsync(IDictionary<string, object> clientProperties, string mechanism, byte[] response,
string locale)
{
var k = new ConnectionStartRpcContinuation();
lock (_rpcLock)
var k = new ConnectionSecureOrTuneContinuation();
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
Enqueue(k);
try
{
Enqueue(k);
try
{
_Private_ConnectionStartOk(clientProperties, mechanism,
response, locale);
_Private_ConnectionStartOk(clientProperties, mechanism, response, locale);
}
catch (AlreadyClosedException)
{
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}
k.GetReply(HandshakeContinuationTimeout);

return await k;
}
finally
{
_rpcSemaphore.Release();
}
return k.m_result;
}

protected abstract bool DispatchAsynchronous(in IncomingCommand cmd);
Expand All @@ -324,7 +321,7 @@ internal void FinishClose()
Session.Close(reason);
}

m_connectionStartCell?.ContinueWithValue(null);
m_connectionStartCell?.TrySetResult(null);
}

private void HandleCommand(in IncomingCommand cmd)
Expand Down Expand Up @@ -385,6 +382,12 @@ protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
Session.Transmit(in method);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<T>(in T method) where T : struct, IOutgoingAmqpMethod
{
return Session.TransmitAsync(in method);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
where TMethod : struct, IOutgoingAmqpMethod
Expand All @@ -397,6 +400,19 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
Session.Transmit(in method, in header, body);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
where TMethod : struct, IOutgoingAmqpMethod
where THeader : IAmqpHeader
{
if (!_flowControlBlock.IsSet)
{
_flowControlBlock.Wait();
}

return Session.TransmitAsync(in method, in header, body);
}

internal void OnCallbackException(CallbackExceptionEventArgs args)
{
_callbackExceptionWrapper.Invoke(this, args);
Expand Down Expand Up @@ -730,13 +746,7 @@ protected void HandleConnectionClose(in IncomingCommand cmd)

protected void HandleConnectionSecure(in IncomingCommand cmd)
{
var challenge = new ConnectionSecure(cmd.MethodBytes.Span)._challenge;
cmd.ReturnMethodBuffer();
var k = (ConnectionStartRpcContinuation)_continuationQueue.Next();
k.m_result = new ConnectionSecureOrTune
{
m_challenge = challenge
};
var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next();
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
}

Expand All @@ -758,25 +768,14 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
m_mechanisms = method._mechanisms,
m_locales = method._locales
};
m_connectionStartCell.ContinueWithValue(details);
m_connectionStartCell?.SetResult(details);
m_connectionStartCell = null;
}

protected void HandleConnectionTune(in IncomingCommand cmd)
{
var connectionTune = new ConnectionTune(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
var k = (ConnectionStartRpcContinuation)_continuationQueue.Next();
k.m_result = new ConnectionSecureOrTune
{
m_tuneDetails =
{
m_channelMax = connectionTune._channelMax,
m_frameMax = connectionTune._frameMax,
m_heartbeatInSeconds = connectionTune._heartbeat
}
};
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next();
k.HandleCommand(cmd); // release the continuation.
}

protected void HandleConnectionUnblocked()
Expand Down Expand Up @@ -815,6 +814,8 @@ protected void HandleQueueDeclareOk(in IncomingCommand cmd)

public abstract void _Private_ConnectionOpen(string virtualHost);

public abstract ValueTask _Private_ConnectionOpenAsync(string virtualHost);

public abstract void _Private_ConnectionSecureOk(byte[] response);

public abstract void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale);
Expand Down Expand Up @@ -930,6 +931,36 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
ChannelSend(in cmd, in basicProperties, body);
}

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
}

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
}

public void UpdateSecret(string newSecret, string reason)
{
if (newSecret is null)
Expand Down
Loading