Skip to content

Commit

Permalink
Implement async for simple AMQP methods
Browse files Browse the repository at this point in the history
Related to:
* #1345
* #1308
* #970
* #843

Implement QueueDeleteAsync, ExchangeDeclareAsync and ExchangeDeleteAsync. Refactoring to come.

Fix public API

Move rpc continuations to their own files

Add continuation timeouts to new AsyncRpcContinuations classes.

Add ExchangeBindAsync to interface
  • Loading branch information
lukebakken committed Oct 24, 2023
1 parent a0321c6 commit c6959f6
Show file tree
Hide file tree
Showing 9 changed files with 518 additions and 101 deletions.
49 changes: 46 additions & 3 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// </remarks>
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);

/// <summary>
/// Asynchronously binds an exchange to an exchange.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask ExchangeBindAsync(string destination, string source, string routingKey, IDictionary<string, object> arguments);

/// <summary>
/// Like ExchangeBind but sets nowait to true.
/// </summary>
Expand All @@ -289,10 +299,17 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// <summary>Declare an exchange.</summary>
/// <remarks>
/// The exchange is declared non-passive and non-internal.
/// The "nowait" option is not exercised.
/// The "nowait" option is not used.
/// </remarks>
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);

/// <summary>Asynchronously declare an exchange.</summary>
/// <remarks>
/// The exchange is declared non-passive and non-internal.
/// The "nowait" option is not exercised.
/// </remarks>
ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);

/// <summary>
/// Same as ExchangeDeclare but sets nowait to true and returns void (as there
/// will be no response from the server).
Expand All @@ -315,6 +332,19 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// </summary>
void ExchangeDelete(string exchange, bool ifUnused);

/*
* TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
/// <summary>
/// Asynchronously delete an exchange.
/// </summary>
ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused);
*/

/// <summary>
/// Asynchronously delete an exchange.
/// </summary>
ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused);

/// <summary>
/// Like ExchangeDelete but sets nowait to true.
/// </summary>
Expand Down Expand Up @@ -411,17 +441,30 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
uint ConsumerCount(string queue);

/// <summary>
/// Delete a queue.
/// Deletes a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
/// <param name="queue">The name of the queue.</param>
/// <param name="ifUnused">Only delete the queue if it is unused.</param>
/// <param name="ifEmpty">Only delete the queue if it is empty.</param>
/// <returns>Returns the number of messages purged during deletion.</returns>
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);

/// <summary>
/// Asynchronously deletes a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
/// <remarks>
///Returns the number of messages purged during queue deletion.
/// </remarks>
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
ValueTask<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty);

/// <summary>
///Same as QueueDelete but sets nowait parameter to true
///and returns void (as there will be no response from the server)
/// </summary>
/// <param name="queue">The name of the queue.</param>
/// <param name="ifUnused">Only delete the queue if it is unused.</param>
/// <param name="ifEmpty">Only delete the queue if it is empty.</param>
/// <returns>Returns the number of messages purged during deletion.</returns>
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);

/// <summary>
Expand Down
55 changes: 55 additions & 0 deletions projects/RabbitMQ.Client/client/api/QueueDeleteOk.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

namespace RabbitMQ.Client
{
/// <summary>
/// Represents Queue deletion information.
/// </summary>
public class QueueDeleteOk
{
private readonly uint _messageCount;

/// <summary>
/// Creates a new instance of <see cref="QueueDeleteOk"/>.
/// </summary>
/// <param name="messageCount">Message count.</param>
public QueueDeleteOk(uint messageCount)
{
_messageCount = messageCount;
}

/// <summary>
/// Count of messages purged when queue was deleted.
/// </summary>
public uint MessageCount => _messageCount;
}
}
226 changes: 226 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
{
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation, IDisposable
{
private readonly CancellationTokenSource _ct;

protected readonly TaskCompletionSource<T> _tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);

private bool _disposedValue;

public AsyncRpcContinuation(TimeSpan continuationTimeout)
{
_ct = new CancellationTokenSource(continuationTimeout);
_ct.Token.Register(() =>
{
if (_tcs.TrySetCanceled())
{
// TODO LRB #1347
// Cancellation was successful, does this mean we should set a TimeoutException
// in the same manner as BlockingCell?
}
}, useSynchronizationContext: false);
}

public TaskAwaiter<T> GetAwaiter() => _tcs.Task.GetAwaiter();

// TODO LRB #1347
// What to do if setting a result fails?
public abstract void HandleCommand(in IncomingCommand cmd);

public void HandleChannelShutdown(ShutdownEventArgs reason) => _tcs.SetException(new OperationInterruptedException(reason));

protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
_ct.Dispose();
}

_disposedValue = true;
}
}

public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}

internal class ConnectionSecureOrTuneContinuation : AsyncRpcContinuation<ConnectionSecureOrTune>
{
public ConnectionSecureOrTuneContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
{
}

public override void HandleCommand(in IncomingCommand cmd)
{
try
{
if (cmd.CommandId == ProtocolCommandId.ConnectionSecure)
{
var secure = new ConnectionSecure(cmd.MethodBytes.Span);
_tcs.TrySetResult(new ConnectionSecureOrTune { m_challenge = secure._challenge });
}
else if (cmd.CommandId == ProtocolCommandId.ConnectionTune)
{
var tune = new ConnectionTune(cmd.MethodBytes.Span);
// TODO LRB #1347
// What to do if setting a result fails?
_tcs.TrySetResult(new ConnectionSecureOrTune
{
m_tuneDetails = new() { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat }
});
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}

internal class SimpleAsyncRpcContinuation : AsyncRpcContinuation<bool>
{
private readonly ProtocolCommandId _expectedCommandId;

public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan continuationTimeout) : base(continuationTimeout)
{
_expectedCommandId = expectedCommandId;
}

public override void HandleCommand(in IncomingCommand cmd)
{
try
{
if (cmd.CommandId == _expectedCommandId)
{
_tcs.TrySetResult(true);
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}

internal class ExchangeDeclareAsyncRpcContinuation : SimpleAsyncRpcContinuation
{
public ExchangeDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeclareOk, continuationTimeout)
{
}
}

internal class ExchangeDeleteAsyncRpcContinuation : SimpleAsyncRpcContinuation
{
public ExchangeDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeleteOk, continuationTimeout)
{
}
}

internal class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation<QueueDeclareOk>
{
public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
{
}

public override void HandleCommand(in IncomingCommand cmd)
{
try
{
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk)
{
_tcs.TrySetResult(result);
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}

internal class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation<QueueDeleteOk>
{
public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
{
}

public override void HandleCommand(in IncomingCommand cmd)
{
try
{
var result = new Client.Framing.Impl.QueueDeleteOk(cmd.MethodBytes.Span);
if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk)
{
_tcs.TrySetResult(result);
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}
}
Loading

0 comments on commit c6959f6

Please sign in to comment.