Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ShutdownComplete by Closed and ShutdownRequested #2396

Merged
merged 30 commits into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d8c701f
Replace ShutdownComplete by Task<Exception?> Closed and ShutdownReque…
bernardnormier Dec 27, 2022
50df1e7
Cleanup
bernardnormier Dec 27, 2022
ea4f976
Fix review comments
bernardnormier Dec 27, 2022
7f6c289
Allow invoke on connecting server connection
bernardnormier Dec 27, 2022
ced41d2
Fix comment
bernardnormier Dec 27, 2022
3c75f19
Fix review comments
bernardnormier Dec 28, 2022
f6aef04
Remove ConnectTimeoutProtocolConnectionDecorator
bernardnormier Dec 28, 2022
7a7e7de
Update Server.DisposeAsync to not shutdown the connections
bernardnormier Dec 28, 2022
adb7bd7
Update ConnectionCache.DisposeAsync to not shut down the connections
bernardnormier Dec 28, 2022
e992028
ClientConnection.DisposeAsync is not an abortive closure
bernardnormier Dec 28, 2022
5b3cdd8
Remove ShutdownTimeout decorator from Server
bernardnormier Dec 28, 2022
09ffa7a
Server.ShutdownAsync is no longer reentrant
bernardnormier Dec 29, 2022
62e01b4
ConnectionCache.ShutdownAsync can only be called once
bernardnormier Dec 29, 2022
5e39e9c
ClientConnection ShutdownAsync update
bernardnormier Dec 29, 2022
ca0bc39
New tests
bernardnormier Dec 29, 2022
9327b68
One more review comment fix
bernardnormier Dec 29, 2022
a86b93b
Move HoldDuplexTransportDecorator to Common
bernardnormier Dec 29, 2022
9cbc9df
Better tests
bernardnormier Dec 29, 2022
6f29dd9
Add Slic test
bernardnormier Dec 29, 2022
9e9fffe
Server fixes
bernardnormier Dec 29, 2022
7ca378b
Merge remote-tracking branch 'origin/main' into shutdown-requested
bernardnormier Dec 29, 2022
541c072
Rework ClientConnection + bug fixes
bernardnormier Dec 30, 2022
c42c36e
More client connection cache fixes
bernardnormier Dec 30, 2022
7f36cf3
Fix identation
bernardnormier Dec 30, 2022
b760eb3
Improve comments
bernardnormier Dec 30, 2022
6c7a278
Remove UTE from ClientConnection
bernardnormier Dec 30, 2022
d9eb2bb
Correctly clear _connectTask UTE
bernardnormier Dec 30, 2022
48f2f4a
Fix shutdown & locking
bernardnormier Jan 1, 2023
326090d
Merge branch 'main' of github.com:zeroc-ice/icerpc-csharp into shutdo…
bernardnormier Jan 2, 2023
e90d3fd
Fix review comments
bernardnormier Jan 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 275 additions & 125 deletions src/IceRpc/ClientConnection.cs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/IceRpc/ClientProtocolConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public IProtocolConnection CreateConnection(ServerAddress serverAddress)
_clientAuthenticationOptions),
transportConnectionInformation: null,
_connectionOptions);
#pragma warning restore CA2000

connection = new MetricsProtocolConnectionDecorator(connection);

return _logger == NullLogger.Instance ? connection :
new LogProtocolConnectionDecorator(connection, remoteNetworkAddress: null, _logger);
#pragma warning restore CA2000
}
}
167 changes: 109 additions & 58 deletions src/IceRpc/ConnectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public sealed class ConnectionCache : IInvoker, IAsyncDisposable

private readonly IClientProtocolConnectionFactory _connectionFactory;

private readonly TimeSpan _connectTimeout;

private Task? _disposeTask;

private readonly object _mutex = new();

// New connections in the process of connecting. They can be returned only after ConnectAsync succeeds.
Expand All @@ -33,6 +37,8 @@ public sealed class ConnectionCache : IInvoker, IAsyncDisposable

private readonly CancellationTokenSource _shutdownCts = new();

private readonly TimeSpan _shutdownTimeout;

/// <summary>Constructs a connection cache.</summary>
/// <param name="options">The connection cache options.</param>
/// <param name="duplexClientTransport">The duplex transport used to create ice protocol connections.</param>
Expand All @@ -52,6 +58,9 @@ public ConnectionCache(
multiplexedClientTransport,
logger);

_connectTimeout = options.ConnectionOptions.ConnectTimeout;
_shutdownTimeout = options.ConnectionOptions.ShutdownTimeout;

_preferExistingConnection = options.PreferExistingConnection;
}

Expand All @@ -63,36 +72,35 @@ public ConnectionCache()

/// <summary>Releases all resources allocated by this connection cache.</summary>
/// <returns>A value task that completes when all connections managed by this cache are disposed.</returns>
public async ValueTask DisposeAsync()
public ValueTask DisposeAsync()
{
lock (_mutex)
{
// We always cancel _shutdownCts with _mutex locked. This way, when _mutex is locked, _shutdownCts.Token
// does not change.
try
if (_disposeTask is null)
{
_shutdownCts.Cancel();
if (_backgroundConnectionDisposeCount == 0)
{
// There is no outstanding background dispose.
_ = _backgroundConnectionDisposeTcs.TrySetResult();
}
_disposeTask = PerformDisposeAsync();
}
catch (ObjectDisposedException)
{
// already disposed by a previous or concurrent call.
}

if (_backgroundConnectionDisposeCount == 0)
{
// There is no outstanding background dispose.
_ = _backgroundConnectionDisposeTcs.TrySetResult();
}
return new(_disposeTask);
}

// Dispose all connections managed by this cache.
IEnumerable<IProtocolConnection> allConnections = _pendingConnections.Values.Select(value => value.Connection)
.Concat(_activeConnections.Values);
async Task PerformDisposeAsync()
{
await Task.Yield(); // exit mutex lock

IEnumerable<IProtocolConnection> allConnections =
_pendingConnections.Values.Select(value => value.Connection).Concat(_activeConnections.Values);

await Task.WhenAll(allConnections.Select(connection => connection.DisposeAsync().AsTask())
.Append(_backgroundConnectionDisposeTcs.Task)).ConfigureAwait(false);
await Task.WhenAll(allConnections.Select(connection => connection.DisposeAsync().AsTask())
.Append(_backgroundConnectionDisposeTcs.Task)).ConfigureAwait(false);

_shutdownCts.Dispose();
_shutdownCts.Dispose();
}
}

/// <summary>Sends an outgoing request and returns the corresponding incoming response. If the request
Expand Down Expand Up @@ -186,7 +194,9 @@ async Task<IncomingResponse> PerformInvokeAsync()
{
try
{
connection = await ConnectAsync(mainServerAddress, cancellationToken).ConfigureAwait(false);
// TODO: this code generates a UTE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't it be fixed with this PR? Or is there an issue we could reference here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create an issue.

connection = await ConnectAsync(mainServerAddress).WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (Exception) when (serverAddressFeature.AltServerAddresses.Count > 0)
{
Expand All @@ -203,7 +213,7 @@ async Task<IncomingResponse> PerformInvokeAsync()

try
{
connection = await ConnectAsync(mainServerAddress, cancellationToken)
connection = await ConnectAsync(mainServerAddress).WaitAsync(cancellationToken)
.ConfigureAwait(false);
break; // for
}
Expand Down Expand Up @@ -238,52 +248,67 @@ public Task ShutdownAsync(CancellationToken cancellationToken = default)
{
lock (_mutex)
{
// We always cancel _shutdownCts with _mutex lock. This way, when _mutex is locked, _shutdownCts.Token
// does not change.
try
if (_disposeTask is not null)
{
_shutdownCts.Cancel();
throw new ObjectDisposedException($"{typeof(ConnectionCache)}");
}
catch (ObjectDisposedException)
if (_shutdownCts.IsCancellationRequested)
{
throw new ObjectDisposedException($"{typeof(ConnectionCache)}");
throw new InvalidOperationException($"The connection cache is already shut down or shutting down.");
}

// We always cancel _shutdownCts with _mutex locked. This way, when _mutex is locked, _shutdownCts.Token
// does not change.
_shutdownCts.Cancel();
}

// Shut down all connections managed by this cache.
IEnumerable<IProtocolConnection> allConnections = _pendingConnections.Values.Select(value => value.Connection)
.Concat(_activeConnections.Values);
return PerformShutdownAsync();

async Task PerformShutdownAsync()
{
IEnumerable<IProtocolConnection> allConnections =
_pendingConnections.Values.Select(value => value.Connection).Concat(_activeConnections.Values);

using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_shutdownTimeout);

return Task.WhenAll(allConnections.Select(connection => connection.ShutdownAsync(cancellationToken)));
try
{
// Note: this throws the first exception, not all of them.
await Task.WhenAll(allConnections.Select(connection => connection.ShutdownAsync(cts.Token)))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException(
$"The connection cache shutdown timed out after {_shutdownTimeout.TotalSeconds} s.");
}
}
}

/// <summary>Creates a connection and attempts to connect this connection unless there is an active or pending
/// connection for the desired server address.</summary>
/// <param name="serverAddress">The server address.</param>
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
/// <returns>A connected connection.</returns>
private async ValueTask<IProtocolConnection> ConnectAsync(
ServerAddress serverAddress,
CancellationToken cancellationToken)
private async Task<IProtocolConnection> ConnectAsync(ServerAddress serverAddress)
{
(IProtocolConnection Connection, Task Task) pendingConnectionValue;

CancellationToken shutdownCancellationToken;

lock (_mutex)
{
try
{
shutdownCancellationToken = _shutdownCts.Token;
}
catch (ObjectDisposedException)
if (_disposeTask is not null)
{
throw new ObjectDisposedException($"{typeof(ConnectionCache)}");
}

shutdownCancellationToken = _shutdownCts.Token;

if (shutdownCancellationToken.IsCancellationRequested)
{
throw new InvalidOperationException("connection cache is shut down or shutting down");
throw new InvalidOperationException("Connection cache is shut down or shutting down.");
}

if (_activeConnections.TryGetValue(serverAddress, out IProtocolConnection? connection))
Expand All @@ -309,12 +334,21 @@ async Task PerformConnectAsync(IProtocolConnection connection)
{
await Task.Yield(); // exit mutex lock

using var cts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken,
shutdownCancellationToken);
using var cts = new CancellationTokenSource(_connectTimeout);
using CancellationTokenRegistration tokenRegistration =
shutdownCancellationToken.UnsafeRegister(cts => ((CancellationTokenSource)cts!).Cancel(), cts);

try
{
_ = await connection.ConnectAsync(cts.Token).ConfigureAwait(false);
try
{
_ = await connection.ConnectAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (!shutdownCancellationToken.IsCancellationRequested)
{
throw new TimeoutException(
$"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
}
Comment on lines +345 to +353
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would get rid of the inner try/catch with something along these lines:

try
{
    _ = await connection.ConnectAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (shutdownCancellationToken.IsCancellationRequested)
{
    throw new IceRpcException(
        IceRpcError.OperationAborted,
        "The connection cache was shut down or disposed.");
}
catch (Exception exception)
{
    lock (_mutex)
    {
        bool removed = _pendingConnections.Remove(serverAddress);
        Debug.Assert(removed);
    }
    
    await connection.DisposeAsync().ConfigureAwait(false);

    if (exception is OperationCanceledException)
    {
        throw new TimeoutException(
            $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
    }
    else
    {
        throw;
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion is not correct.

Once we lock the mutex, we have to check shutdownCancellationToken.IsCancellationRequested.

}
catch
{
Expand All @@ -337,8 +371,6 @@ async Task PerformConnectAsync(IProtocolConnection connection)
}

await connection.DisposeAsync().ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested(); // throws OCE
throw;
}

Expand All @@ -365,24 +397,22 @@ async Task PerformConnectAsync(IProtocolConnection connection)

async Task RemoveFromActiveAsync(IProtocolConnection connection, CancellationToken shutdownCancellationToken)
{
bool shutdownRequested;

try
{
await connection.ShutdownComplete.WaitAsync(shutdownCancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException exception) when (exception.CancellationToken == shutdownCancellationToken)
{
// The connection cache is being shut down or disposed and cache's DisposeAsync is responsible to
// DisposeAsync this connection.
return;
shutdownRequested = await Task.WhenAny(connection.ShutdownRequested, connection.Closed)
.WaitAsync(shutdownCancellationToken).ConfigureAwait(false) == connection.ShutdownRequested;
}
catch
catch (OperationCanceledException)
{
// ignore and continue: the connection was aborted
// The connection cache is being shut down or disposed. We handle it below after locking the mutex.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we simply return here and avoid check for shutdownCancellationToken.IsCancellationRequested in the mutex lock block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I think it's cleaner that way.

shutdownRequested = false;
}
// no other exception can be thrown

lock (_mutex)
{
// shutdownCancellationToken.IsCancellationRequested remains the same when _mutex is locked.
if (shutdownCancellationToken.IsCancellationRequested)
{
// ConnectionCache.DisposeAsync is responsible to dispose this connection.
Expand All @@ -396,6 +426,27 @@ async Task RemoveFromActiveAsync(IProtocolConnection connection, CancellationTok
}
}

if (shutdownRequested)
{
using var cts = new CancellationTokenSource(_shutdownTimeout);

try
{
await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (IceRpcException)
{
}
catch (Exception exception)
{
Debug.Fail($"Unexpected connection shutdown exception: {exception}");
throw;
}
}

await connection.DisposeAsync().ConfigureAwait(false);

lock (_mutex)
Expand Down
2 changes: 1 addition & 1 deletion src/IceRpc/IConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface IConnectionContext
/// <summary>Gets a task that completes when the connection is shut down or aborted.</summary>
/// <value>A task that completes when the connection is successfully shut down. It completes with an exception when
/// the connection is aborted.</value>
Task ShutdownComplete { get; }
Task<Exception?> Closed { get; }

/// <summary>Gets the transport connection information.</summary>
TransportConnectionInformation TransportConnectionInformation { get; }
Expand Down
36 changes: 15 additions & 21 deletions src/IceRpc/IProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ namespace IceRpc;
/// </summary>
public interface IProtocolConnection : IInvoker, IAsyncDisposable
{
/// <summary>Gets a task that completes when the connection is closed.</summary>
/// <value>A task that completes when the connection is closed. If the connection was shut down gracefully, this
/// task completes with a null exception; otherwise, it completes with the exception that aborted the connection.
/// </value>
/// <remarks>This task is never faulted or canceled.</remarks>
Task<Exception?> Closed { get; }

/// <summary>Gets the server address of this connection.</summary>
/// <value>The server address of this connection. Its <see cref="ServerAddress.Transport" /> property is always
/// non-null.</value>
ServerAddress ServerAddress { get; }

/// <summary>Gets a task that completes when the connection is shut down or fails. The connection shutdown is
/// initiated by any of the following events:
/// <list type="bullet">
/// <item><description>The application calls <see cref="ShutdownAsync" /> on the connection.</description></item>
/// <item><description>The connection shuts down itself because it remained idle for longer than its configured idle
/// timeout.</description></item>
/// <item><description>The peer shuts down the connection.</description></item>
/// </list>
/// <summary>Gets a task that completes when the peer or the idle monitor requests the shutdown of this connection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it also complete when the connection is closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it does not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it relied on the peer requesting shutdown? Is it guaranteed to be completed even if the connection is lost with the peer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can never complete, which is completely fine. It's like registering a callback that is never called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious that this can't create leaks since the task continuation will never run. If this continuation references objects, these might never be collected. It would be good to investigate this to verify that it's safe.

/// </summary>
/// <value>A task that completes when the connection is successfully shut down. It completes with an exception when
/// the connection fails.</value>
Task ShutdownComplete { get; }
/// <remarks>This task is never faulted or canceled.</remarks>
/// <seealso cref="ConnectionOptions.IdleTimeout" />
Task ShutdownRequested { get; }

/// <summary>Establishes the connection to the peer.</summary>
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
Expand All @@ -37,33 +37,27 @@ public interface IProtocolConnection : IInvoker, IAsyncDisposable
/// </item>
/// <item><description><see cref="OperationCanceledException" />if cancellation was requested through the
/// cancellation token.</description></item>
/// <item><description><see cref="TimeoutException" />if the connection establishment attempt exceeded <see
/// cref="ConnectionOptions.ConnectTimeout" />.</description></item>
/// </list>
/// </returns>
/// <exception cref="IceRpcException">Thrown if the connection is closed but not disposed yet.</exception>
/// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
/// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception>
Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken = default);

/// <summary>Gracefully shuts down the connection. The shutdown waits for pending invocations and dispatches to
/// complete. For a speedier graceful shutdown, call <see cref="IAsyncDisposable.DisposeAsync" /> instead. It will
/// cancel pending invocations and dispatches.</summary>
/// <summary>Gracefully shuts down the connection.</summary>
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
/// <returns>A task that completes once the shutdown is complete. This task can also complete with one of the
/// following exceptions:
/// <list type="bullet">
/// <item><description><see cref="IceRpcException" />if the connection shutdown failed.</description></item>
/// <item><description><see cref="OperationCanceledException" />if cancellation was requested through the
/// cancellation token.</description></item>
/// <item><description><see cref="TimeoutException" />if this shutdown attempt or a previous attempt exceeded <see
/// cref="ConnectionOptions.ShutdownTimeout" />.</description></item>
/// </list>
/// </returns>
/// <exception cref="IceRpcException">Thrown if the connection is closed but not disposed yet.</exception>
/// <exception cref="InvalidOperationException">Thrown if the connection was not connected successfully prior to
/// call.</exception>
/// this call, or if this method is called more than once.</exception>
/// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception>
/// <remarks>If shutdown is canceled, the protocol connection transitions to a faulted state and the disposal of the
/// connection will abort the connection instead of performing a graceful speedy-shutdown.</remarks>
/// <remarks>If cancellation token is canceled, the protocol connection is aborted.</remarks>
Task ShutdownAsync(CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/IceRpc/Internal/ConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal sealed class ConnectionContext : IConnectionContext

public ServerAddress ServerAddress => _protocolConnection.ServerAddress;

public Task ShutdownComplete => _protocolConnection.ShutdownComplete;
public Task<Exception?> Closed => _protocolConnection.Closed;

public TransportConnectionInformation TransportConnectionInformation { get; }

Expand Down
Loading