Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8.0: implement WaitForConfirmsAsync #999

Merged
merged 1 commit into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 6 additions & 44 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

using System;
using System.Collections.Generic;

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -470,68 +471,29 @@ string BasicConsume(
/// </summary>
void TxSelect();

/// <summary>Wait until all published messages have been confirmed.
/// </summary>
/// <remarks>
/// Waits until all messages published since the last call have
/// been either ack'd or nack'd by the broker. Returns whether
/// all the messages were ack'd (and none were nack'd). Note,
/// throws an exception when called on a non-Confirm channel.
/// </remarks>
bool WaitForConfirms();

/// <summary>
/// Wait until all published messages have been confirmed.
/// </summary>
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
/// <param name="timeout">How long to wait (at most) before returning
///whether or not any nacks were returned.
/// </param>
/// <param name="token">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published since the last call have
/// been either ack'd or nack'd by the broker. Returns whether
/// all the messages were ack'd (and none were nack'd). Note,
/// throws an exception when called on a non-Confirm channel.
/// </remarks>
bool WaitForConfirms(TimeSpan timeout);

/// <summary>
/// Wait until all published messages have been confirmed.
/// </summary>
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
/// <param name="timeout">How long to wait (at most) before returning
/// whether or not any nacks were returned.
/// </param>
/// <param name="timedOut">True if the method returned because
/// the timeout elapsed, not because all messages were ack'd or at least one nack'd.
/// </param>
/// <remarks>
/// Waits until all messages published since the last call have
/// been either ack'd or nack'd by the broker. Returns whether
/// all the messages were ack'd (and none were nack'd). Note,
/// throws an exception when called on a non-Confirm channel.
/// </remarks>
bool WaitForConfirms(TimeSpan timeout, out bool timedOut);

/// <summary>
/// Wait until all published messages have been confirmed.
/// </summary>
/// <remarks>
/// Waits until all messages published since the last call have
/// been ack'd by the broker. If a nack is received, throws an
/// OperationInterrupedException exception immediately.
/// </remarks>
void WaitForConfirmsOrDie();
Task<bool> WaitForConfirmsAsync(CancellationToken token = default);

/// <summary>
/// Wait until all published messages have been confirmed.
/// </summary>
/// <param name="token">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published since the last call have
/// been ack'd by the broker. If a nack is received or the timeout
/// elapses, throws an OperationInterrupedException exception immediately.
/// </remarks>
void WaitForConfirmsOrDie(TimeSpan timeout);
Task WaitForConfirmsOrDieAsync(CancellationToken token = default);

/// <summary>
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
Expand Down
14 changes: 4 additions & 10 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Xml.Schema;

using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Framing.Impl;

Expand Down Expand Up @@ -1023,15 +1023,9 @@ public void TxSelect()
_usesTransactions = true;
}

public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) => Delegate.WaitForConfirms(timeout, out timedOut);

public bool WaitForConfirms(TimeSpan timeout) => Delegate.WaitForConfirms(timeout);

public bool WaitForConfirms() => Delegate.WaitForConfirms();

public void WaitForConfirmsOrDie() => Delegate.WaitForConfirmsOrDie();
public Task<bool> WaitForConfirmsAsync(CancellationToken token = default) => Delegate.WaitForConfirmsAsync(token);

public void WaitForConfirmsOrDie(TimeSpan timeout) => Delegate.WaitForConfirmsOrDie(timeout);
public Task WaitForConfirmsOrDieAsync(CancellationToken token = default) => Delegate.WaitForConfirmsOrDieAsync(token);

private void RecoverBasicAckHandlers()
{
Expand Down
13 changes: 2 additions & 11 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
_connectionBlockedWrapper = new EventingWrapper<ConnectionBlockedEventArgs>("OnConnectionBlocked", onException);
_connectionUnblockedWrapper = new EventingWrapper<EventArgs>("OnConnectionUnblocked", onException);

_sessionManager = new SessionManager(this, 0);
_sessionManager = new SessionManager(this, 0);
_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
_model0 = (ModelBase)Protocol.CreateModel(_session0);

Expand Down Expand Up @@ -426,15 +426,6 @@ public void FinishClose()
_model0.FinishClose();
}

/// <remarks>
/// We need to close the socket, otherwise attempting to unload the domain
/// could cause a CannotUnloadAppDomainException
/// </remarks>
public void HandleDomainUnload(object sender, EventArgs ea)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was unused

{
Abort(Constants.InternalError, "Domain Unload");
}

public void HandleMainLoopException(ShutdownEventArgs reason)
{
if (!SetCloseReason(reason))
Expand Down Expand Up @@ -750,7 +741,7 @@ public void MaybeStartHeartbeatTimers()

public void StartMainLoop()
{
_mainLoopTask = Task.Run((Action)MainLoop);
_mainLoopTask = Task.Factory.StartNew(MainLoop, TaskCreationOptions.LongRunning);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forces the creation of a new thread instead of blocking one of the threadpool

}

public void HeartbeatReadTimerCallback(object state)
Expand Down
Loading