From 752923cb0ac5dc7c56b6fe2f14e8863585870d3b Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 26 Feb 2024 08:56:17 -0800 Subject: [PATCH] Continue removing sync API Part of #1472 * Remove the last sync I/O method from `IFrameHandler` Update action versions to address Node.js warnings * Remove use of `EnsureCompleted` * Handle connection start using async/await * Remove `EnsureCompleted` task extensions. * Remove `BasicConsumeRpcContinuation` * Update action versions to address Node.js warnings * Remove non-async RPC continuation classes * Remove unused classes * Address some TODOs * Set a `TimeoutException` when an async RPC continuation times out --- .github/workflows/build-test.yaml | 34 ++-- .github/workflows/oauth2.yaml | 4 +- .github/workflows/publish-nuget.yaml | 2 +- ...orking_BasicDeliver_LongLivedConnection.cs | 4 +- .../RabbitMQ.Client/client/TaskExtensions.cs | 28 ---- .../RabbitMQ.Client/client/framing/Channel.cs | 15 +- .../client/impl/AsyncRpcContinuations.cs | 1 + .../client/impl/AutorecoveringConnection.cs | 8 - .../client/impl/ChannelBase.cs | 156 ++---------------- .../client/impl/Connection.Commands.cs | 3 +- .../RabbitMQ.Client/client/impl/Connection.cs | 5 - .../client/impl/IFrameHandler.cs | 1 - .../client/impl/RpcContinuations.cs | 95 ----------- .../client/impl/ShutdownContinuation.cs | 72 -------- .../client/impl/SocketFrameHandler.cs | 15 -- projects/RabbitMQ.Client/util/BlockingCell.cs | 93 ----------- projects/RabbitMQ.Client/util/Either.cs | 74 --------- projects/Test/Unit/TestBlockingCell.cs | 147 ----------------- .../Test/Unit/TestRpcContinuationQueue.cs | 17 +- 19 files changed, 53 insertions(+), 721 deletions(-) delete mode 100644 projects/RabbitMQ.Client/client/impl/RpcContinuations.cs delete mode 100644 projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs delete mode 100644 projects/RabbitMQ.Client/util/BlockingCell.cs delete mode 100644 projects/RabbitMQ.Client/util/Either.cs delete mode 100644 projects/Test/Unit/TestBlockingCell.cs diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 95049dd95a..c4dfb42ab2 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -15,7 +15,7 @@ jobs: with: submodules: true - name: Cache NuGet packages - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.nuget/packages @@ -32,7 +32,7 @@ jobs: - name: Unit Tests run: dotnet test "${{ github.workspace }}\projects\Test\Unit\Unit.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Upload Build (Debug) - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: rabbitmq-dotnet-client-build-win32 path: | @@ -52,14 +52,14 @@ jobs: with: submodules: true - name: Cache installers - uses: actions/cache@v3 + uses: actions/cache@v4 with: # Note: the cache path is relative to the workspace directory # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action path: ~/installers key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} - name: Download Build (Debug) - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: rabbitmq-dotnet-client-build-win32 path: projects @@ -81,7 +81,7 @@ jobs: "${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: rabbitmq-logs-integration-win32 path: ~/AppData/Roaming/RabbitMQ/log/ @@ -97,14 +97,14 @@ jobs: with: submodules: true - name: Cache installers - uses: actions/cache@v3 + uses: actions/cache@v4 with: # Note: the cache path is relative to the workspace directory # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action path: ~/installers key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} - name: Download Build (Debug) - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: rabbitmq-dotnet-client-build-win32 path: projects @@ -115,7 +115,7 @@ jobs: run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: rabbitmq-logs-sequential-integration-win32 path: ~/AppData/Roaming/RabbitMQ/log/ @@ -128,11 +128,11 @@ jobs: with: submodules: true - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: 6.x - name: Cache NuGet packages - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.nuget/packages @@ -149,7 +149,7 @@ jobs: - name: Unit Tests run: dotnet test "${{ github.workspace }}/projects/Test/Unit/Unit.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Upload Build (Debug) - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: rabbitmq-dotnet-client-build-ubuntu path: | @@ -166,11 +166,11 @@ jobs: with: submodules: true - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: 6.x - name: Download Build (Debug) - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: rabbitmq-dotnet-client-build-ubuntu path: projects @@ -188,7 +188,7 @@ jobs: "${{ github.workspace }}/projects/Test/Integration/Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: rabbitmq-logs-integration-ubuntu path: ${{ github.workspace }}/.ci/ubuntu/log/ @@ -201,11 +201,11 @@ jobs: with: submodules: true - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: 6.x - name: Download Build (Debug) - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: rabbitmq-dotnet-client-build-ubuntu path: projects @@ -219,7 +219,7 @@ jobs: "${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: rabbitmq-logs-sequential-integration-ubuntu path: ${{ github.workspace }}/.ci/ubuntu/log/ diff --git a/.github/workflows/oauth2.yaml b/.github/workflows/oauth2.yaml index dd2e1998c7..efbe4ffee4 100644 --- a/.github/workflows/oauth2.yaml +++ b/.github/workflows/oauth2.yaml @@ -17,10 +17,10 @@ jobs: - uses: actions/checkout@v4 with: submodules: true - - uses: actions/setup-dotnet@v3 + - uses: actions/setup-dotnet@v4 with: dotnet-version: 6.x - - uses: actions/cache@v3 + - uses: actions/cache@v4 with: path: | ~/.nuget/packages diff --git a/.github/workflows/publish-nuget.yaml b/.github/workflows/publish-nuget.yaml index da1af344c4..4e68e670fa 100644 --- a/.github/workflows/publish-nuget.yaml +++ b/.github/workflows/publish-nuget.yaml @@ -13,7 +13,7 @@ jobs: - uses: actions/checkout@v4 with: submodules: true - - uses: actions/cache@v3 + - uses: actions/cache@v4 with: path: | ~/.nuget/packages diff --git a/projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs b/projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs index d60adbdc3d..78bab87234 100644 --- a/projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs +++ b/projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs @@ -22,7 +22,7 @@ public void GlobalSetup() var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 }; // TODO / NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738 - _connection = cf.CreateConnectionAsync().EnsureCompleted(); + _connection = EnsureCompleted(cf.CreateConnectionAsync()); } [GlobalCleanup] @@ -37,5 +37,7 @@ public Task Publish_Hello_World() { return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body); } + + private static T EnsureCompleted(Task task) => task.GetAwaiter().GetResult(); } } diff --git a/projects/RabbitMQ.Client/client/TaskExtensions.cs b/projects/RabbitMQ.Client/client/TaskExtensions.cs index d8458d2dc4..99d7511b82 100644 --- a/projects/RabbitMQ.Client/client/TaskExtensions.cs +++ b/projects/RabbitMQ.Client/client/TaskExtensions.cs @@ -203,34 +203,6 @@ static async ValueTask DoTimeoutAfter(ValueTask valueTask, TimeSpan timeout) #endif } - /* - * https://devblogs.microsoft.com/dotnet/configureawait-faq/ - * I'm using GetAwaiter().GetResult(). Do I need to use ConfigureAwait(false)? - * Answer: No - */ - public static void EnsureCompleted(this Task task) - { - task.GetAwaiter().GetResult(); - } - - public static T EnsureCompleted(this Task task) - { - return task.GetAwaiter().GetResult(); - } - - public static T EnsureCompleted(this ValueTask task) - { - return task.GetAwaiter().GetResult(); - } - - public static void EnsureCompleted(this ValueTask task) - { - if (false == task.IsCompletedSuccessfully) - { - task.GetAwaiter().GetResult(); - } - } - #if NETSTANDARD // https://github.com/dotnet/runtime/issues/23878 // https://github.com/dotnet/runtime/issues/23878#issuecomment-1398958645 diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index b28b55f3c3..d18324f724 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -108,23 +108,19 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.BasicCancelOk: { - bool result = HandleBasicCancelOk(in cmd); - return Task.FromResult(result); + return Task.FromResult(false); } case ProtocolCommandId.BasicConsumeOk: { - bool result = HandleBasicConsumeOk(in cmd); - return Task.FromResult(result); + return Task.FromResult(false); } case ProtocolCommandId.BasicGetEmpty: { - bool result = HandleBasicGetEmpty(in cmd); - return Task.FromResult(result); + return Task.FromResult(false); } case ProtocolCommandId.BasicGetOk: { - bool result = HandleBasicGetOk(in cmd); - return Task.FromResult(result); + return Task.FromResult(false); } case ProtocolCommandId.BasicNack: { @@ -165,8 +161,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionStart: { - HandleConnectionStart(in cmd); - return Task.FromResult(true); + return HandleConnectionStartAsync(cmd, cancellationToken); } case ProtocolCommandId.ConnectionTune: { diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index 00f2a272c5..2a8d3867f0 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -68,6 +68,7 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout) // TODO LRB rabbitmq/rabbitmq-dotnet-client#1347 // Cancellation was successful, does this mean we should set a TimeoutException // in the same manner as BlockingCell? + tcs.SetException(new TimeoutException("TODO LRB rabbitmq/rabbitmq-dotnet-client#1347")); } }, _tcs); #else diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 02afa9b107..a44dcbe4fb 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -65,14 +65,6 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end _endpoints = endpoints; } - internal IConnection Open() - { - IFrameHandler fh = _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, CancellationToken.None).EnsureCompleted(); - CreateInnerConnection(fh); - _innerConnection.Open(); - return this; - } - internal async ValueTask OpenAsync(CancellationToken cancellationToken) { IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index e01427b66b..5f2177bccd 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -277,6 +277,7 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok using var timeoutTokenSource = new CancellationTokenSource(HandshakeContinuationTimeout); using var lts = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken); var m = new ConnectionOpen(virtualHost); + // Note: must be awaited or else the timeoutTokenSource instance will be disposed await ModelSendAsync(m, lts.Token).ConfigureAwait(false); } @@ -590,7 +591,7 @@ protected void HandleBasicCancel(in IncomingCommand cmd) { try { - var consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag; + string consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag; ConsumerDispatcher.HandleBasicCancel(consumerTag); } finally @@ -599,53 +600,6 @@ protected void HandleBasicCancel(in IncomingCommand cmd) } } - protected bool HandleBasicCancelOk(in IncomingCommand cmd) - { - if (_continuationQueue.TryPeek(out var k)) - { - try - { - _continuationQueue.Next(); - string consumerTag = new Client.Framing.Impl.BasicCancelOk(cmd.MethodSpan)._consumerTag; - ConsumerDispatcher.HandleBasicCancelOk(consumerTag); - k.HandleCommand(IncomingCommand.Empty); // release the continuation. - return true; - } - finally - { - cmd.ReturnBuffers(); - } - } - else - { - return false; - } - } - - protected bool HandleBasicConsumeOk(in IncomingCommand cmd) - { - if (_continuationQueue.TryPeek(out var k)) - { - try - { - _continuationQueue.Next(); - var consumerTag = new Client.Framing.Impl.BasicConsumeOk(cmd.MethodSpan)._consumerTag; - k.m_consumerTag = consumerTag; - ConsumerDispatcher.HandleBasicConsumeOk(k.m_consumer, consumerTag); - k.HandleCommand(IncomingCommand.Empty); // release the continuation. - return true; - } - finally - { - cmd.ReturnBuffers(); - } - } - else - { - return false; - } - } - protected void HandleBasicDeliver(in IncomingCommand cmd) { try @@ -671,65 +625,11 @@ protected void HandleBasicDeliver(in IncomingCommand cmd) } } - protected bool HandleBasicGetOk(in IncomingCommand cmd) - { - if (_continuationQueue.TryPeek(out var k)) - { - try - { - var method = new BasicGetOk(cmd.MethodSpan); - var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); - _continuationQueue.Next(); - k.m_result = new BasicGetResult( - AdjustDeliveryTag(method._deliveryTag), - method._redelivered, - method._exchange, - method._routingKey, - method._messageCount, - header, - cmd.Body.ToArray()); - k.HandleCommand(IncomingCommand.Empty); // release the continuation. - return true; - } - finally - { - // Note: since we copy the body buffer above, we want to return all buffers here - cmd.ReturnBuffers(); - } - } - else - { - return false; - } - } - protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) { return deliveryTag; } - protected bool HandleBasicGetEmpty(in IncomingCommand cmd) - { - if (_continuationQueue.TryPeek(out var k)) - { - try - { - _continuationQueue.Next(); - k.m_result = null; - k.HandleCommand(IncomingCommand.Empty); // release the continuation. - return true; - } - finally - { - cmd.ReturnBuffers(); - } - } - else - { - return false; - } - } - protected void HandleBasicReturn(in IncomingCommand cmd) { try @@ -830,7 +730,7 @@ protected void HandleConnectionBlocked(in IncomingCommand cmd) { try { - var reason = new ConnectionBlocked(cmd.MethodSpan)._reason; + string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; Session.Connection.HandleConnectionBlocked(reason); } finally @@ -877,17 +777,16 @@ protected void HandleConnectionSecure(in IncomingCommand cmd) k.HandleCommand(IncomingCommand.Empty); // release the continuation. } - protected void HandleConnectionStart(in IncomingCommand cmd) + protected async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { if (m_connectionStartCell is null) { var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); - // // TODO async / cancellation token - Session.Connection.CloseAsync(reason, false, + await Session.Connection.CloseAsync(reason, false, InternalConstants.DefaultConnectionCloseTimeout, - CancellationToken.None).EnsureCompleted(); + cancellationToken); } else { @@ -903,6 +802,8 @@ protected void HandleConnectionStart(in IncomingCommand cmd) m_connectionStartCell.SetResult(details); m_connectionStartCell = null; } + + return true; } finally { @@ -931,27 +832,10 @@ protected void HandleConnectionUnblocked(in IncomingCommand cmd) } } + // TODO rabbitmq-dotnet-client-1472 remove this method protected bool HandleQueueDeclareOk(in IncomingCommand cmd) { - if (_continuationQueue.TryPeek(out var k)) - { - try - { - _continuationQueue.Next(); - var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodSpan); - k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); - k.HandleCommand(IncomingCommand.Empty); // release the continuation. - return true; - } - finally - { - cmd.ReturnBuffers(); - } - } - else - { - return false; - } + return false; } public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken); @@ -1220,26 +1104,6 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac } } -#if FOO - BasicProperties props = default; - if (basicProperties is BasicProperties properties) - { - props = properties; - } - else if (basicProperties is EmptyBasicProperty) - { - props = new BasicProperties(); - } - - var headers = props.Headers ?? new Dictionary(); - - // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties); - props.Headers = headers; - return props; - } -#endif - public async Task UpdateSecretAsync(string newSecret, string reason) { if (newSecret is null) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index 8caa3e2843..a80b11806f 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -113,8 +113,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken) if (!serverVersion.Equals(Protocol.Version)) { TerminateMainloop(); - // TODO hmmm - FinishCloseAsync(CancellationToken.None).EnsureCompleted(); + await FinishCloseAsync(cancellationToken); throw new ProtocolVersionMismatchException(Protocol.MajorVersion, Protocol.MinorVersion, serverVersion.Major, serverVersion.Minor); } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 797a534209..26c1969898 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -213,11 +213,6 @@ internal void TakeOver(Connection other) _connectionShutdownWrapper.Takeover(other._connectionShutdownWrapper); } - internal IConnection Open() - { - return OpenAsync(CancellationToken.None).EnsureCompleted(); - } - internal async ValueTask OpenAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); diff --git a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs index 8b2f161dea..f0667ed551 100644 --- a/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/IFrameHandler.cs @@ -70,7 +70,6 @@ internal interface IFrameHandler Task SendProtocolHeaderAsync(CancellationToken cancellationToken); - void Write(RentedMemory frames); // TODO remove, should be async only ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationToken); } } diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs deleted file mode 100644 index 2b2dbad895..0000000000 --- a/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs +++ /dev/null @@ -1,95 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using RabbitMQ.Client.Exceptions; -using RabbitMQ.Util; - -namespace RabbitMQ.Client.Impl -{ - internal class SimpleBlockingRpcContinuation : IRpcContinuation - { - private readonly BlockingCell> m_cell = new BlockingCell>(); - internal DateTime StartTime { get; } = DateTime.UtcNow; - - public void GetReply(TimeSpan timeout) - { - Either result = m_cell.WaitForValue(timeout); - if (result.Alternative == EitherAlternative.Left) - { - return; - } - ThrowOperationInterruptedException(result.RightValue); - } - - public void GetReply(TimeSpan timeout, out IncomingCommand reply) - { - Either result = m_cell.WaitForValue(timeout); - if (result.Alternative == EitherAlternative.Left) - { - reply = result.LeftValue; - return; - } - - reply = IncomingCommand.Empty; - ThrowOperationInterruptedException(result.RightValue); - } - - private static void ThrowOperationInterruptedException(ShutdownEventArgs shutdownEventArgs) - => throw new OperationInterruptedException(shutdownEventArgs); - - public void HandleCommand(in IncomingCommand cmd) - { - m_cell.ContinueWithValue(Either.Left(cmd)); - } - - public void HandleChannelShutdown(ShutdownEventArgs reason) - { - m_cell.ContinueWithValue(Either.Right(reason)); - } - } - - internal class BasicConsumeRpcContinuation : SimpleBlockingRpcContinuation - { - public IBasicConsumer m_consumer; - public string m_consumerTag; - } - - internal class BasicGetRpcContinuation : SimpleBlockingRpcContinuation - { - public BasicGetResult m_result; - } - - internal class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation - { - public QueueDeclareOk m_result; - } -} diff --git a/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs b/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs deleted file mode 100644 index 9992595ab3..0000000000 --- a/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs +++ /dev/null @@ -1,72 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; - -using RabbitMQ.Util; - -namespace RabbitMQ.Client.Impl -{ - internal class ShutdownContinuation - { - public readonly BlockingCell m_cell = new BlockingCell(); - - // You will note there are two practically identical overloads - // of OnShutdown() here. This is because Microsoft's C# - // compilers do not consistently support the Liskov - // substitutability principle. When I use - // OnShutdown(object,ShutdownEventArgs), the compilers - // complain that OnShutdown can't be placed into a - // ConnectionShutdownEventHandler because object doesn't - // "match" IConnection, even though there's no context in - // which the program could Go Wrong were it to accept the - // code. The same problem appears for - // ChannelShutdownEventHandler. The .NET 1.1 compiler complains - // about these two cases, and the .NET 2.0 compiler does not - - // presumably they improved the type checker with the new - // release of the compiler. - - public virtual void OnConnectionShutdown(object sender, ShutdownEventArgs reason) - { - m_cell.ContinueWithValue(reason); - } - - public virtual ShutdownEventArgs Wait() - { - return m_cell.WaitForValue(); - } - - public ShutdownEventArgs Wait(TimeSpan timeout) - { - return m_cell.WaitForValue(timeout); - } - } -} diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index 115ab15ecb..12ab187651 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -296,21 +296,6 @@ await _pipeWriter.FlushAsync(cancellationToken) .ConfigureAwait(false); } - public void Write(RentedMemory frames) - { - if (_closed) - { - frames.Dispose(); - } - else - { - if (false == _channelWriter.TryWrite(frames)) - { - // TODO what to do here? - } - } - } - public ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationToken) { if (_closed) diff --git a/projects/RabbitMQ.Client/util/BlockingCell.cs b/projects/RabbitMQ.Client/util/BlockingCell.cs deleted file mode 100644 index 356fc91429..0000000000 --- a/projects/RabbitMQ.Client/util/BlockingCell.cs +++ /dev/null @@ -1,93 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading; - -namespace RabbitMQ.Util -{ - ///A thread-safe single-assignment reference cell. - /// - ///A fresh BlockingCell holds no value (is empty). Any thread - ///reading the Value property when the cell is empty will block - ///until a value is made available by some other thread. The Value - ///property can only be set once - on the first call, the - ///BlockingCell is considered full, and made immutable. Further - ///attempts to set Value result in a thrown - ///InvalidOperationException. - /// - internal class BlockingCell - { - private readonly ManualResetEventSlim _manualResetEventSlim = new ManualResetEventSlim(false); - private T _value = default; - - public void ContinueWithValue(T value) - { - _value = value; - _manualResetEventSlim.Set(); - } - - ///Retrieve the cell's value, waiting for the given - ///timeout if no value is immediately available. - /// - /// - /// If a value is present in the cell at the time the call is - /// made, the call will return immediately. Otherwise, the - /// calling thread blocks until either a value appears, or - /// operation times out. - /// - /// - /// If no value was available before the timeout, an exception - /// is thrown. - /// - /// - /// - public T WaitForValue(TimeSpan timeout) - { - if (_manualResetEventSlim.Wait(timeout)) - { - return _value; - } - - // TODO do not use System.TimeoutException here - throw new TimeoutException(); - } - - ///Retrieve the cell's value, blocking if none exists - ///at present, or supply a value to an empty cell, thereby - ///filling it. - /// - public T WaitForValue() - { - return WaitForValue(TimeSpan.FromMinutes(60)); - } - } -} diff --git a/projects/RabbitMQ.Client/util/Either.cs b/projects/RabbitMQ.Client/util/Either.cs deleted file mode 100644 index 6ef9bdbeee..0000000000 --- a/projects/RabbitMQ.Client/util/Either.cs +++ /dev/null @@ -1,74 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -namespace RabbitMQ.Util -{ - ///Used internally by class Either. - internal enum EitherAlternative - { - Left, - Right - } - - - ///Models the disjoint union of two alternatives, a - ///"left" alternative and a "right" alternative. - ///Borrowed from ML, Haskell etc. - internal class Either - { - ///Private constructor. Use the static methods Left, Right instead. - private Either(EitherAlternative alternative, in L valueL, R valueR) - { - Alternative = alternative; - LeftValue = valueL; - RightValue = valueR; - } - - ///Retrieve the alternative represented by this instance. - public EitherAlternative Alternative { get; } - - ///Retrieve the value carried by this instance. - public L LeftValue { get; } - public R RightValue { get; } - - ///Constructs an Either instance representing a Left alternative. - public static Either Left(in L value) - { - return new Either(EitherAlternative.Left, value, default); - } - - ///Constructs an Either instance representing a Right alternative. - public static Either Right(R value) - { - return new Either(EitherAlternative.Right, default, value); - } - } -} diff --git a/projects/Test/Unit/TestBlockingCell.cs b/projects/Test/Unit/TestBlockingCell.cs deleted file mode 100644 index 9439e4298a..0000000000 --- a/projects/Test/Unit/TestBlockingCell.cs +++ /dev/null @@ -1,147 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading; -using RabbitMQ.Util; -using Xunit; - -namespace Test.Unit -{ - public class TestBlockingCell : TimingFixture - { - internal class DelayedSetter - { - public BlockingCell m_k; - public TimeSpan m_delay; - public T m_v; - public void Run() - { - Thread.Sleep(m_delay); - m_k.ContinueWithValue(m_v); - } - } - - internal static void SetAfter(TimeSpan delay, BlockingCell k, T v) - { - var ds = new DelayedSetter - { - m_k = k, - m_delay = delay, - m_v = v - }; - new Thread(new ThreadStart(ds.Run)).Start(); - } - - public DateTime m_startTime; - - private void ResetTimer() - { - m_startTime = DateTime.Now; - } - - public TimeSpan ElapsedMs() - { - return DateTime.Now - m_startTime; - } - - [Fact] - public void TestSetBeforeGet() - { - var k = new BlockingCell(); - k.ContinueWithValue(123); - Assert.Equal(123, k.WaitForValue()); - } - - [Fact] - public void TestGetValueWhichDoesNotTimeOut() - { - var k = new BlockingCell(); - k.ContinueWithValue(123); - - ResetTimer(); - int v = k.WaitForValue(TimingInterval); - Assert.True(SafetyMargin > ElapsedMs()); - Assert.Equal(123, v); - } - - [Fact] - public void TestGetValueWhichDoesTimeOut() - { - var k = new BlockingCell(); - ResetTimer(); - Assert.Throws(() => k.WaitForValue(TimingInterval)); - } - - [Fact] - public void TestGetValueWhichDoesTimeOutWithTimeSpan() - { - var k = new BlockingCell(); - ResetTimer(); - Assert.Throws(() => k.WaitForValue(TimingInterval)); - } - - [Fact] - public void TestBackgroundUpdateSucceedsWithTimeSpan() - { - var k = new BlockingCell(); - SetAfter(TimingInterval, k, 123); - - ResetTimer(); - int v = k.WaitForValue(TimingInterval_2X); - Assert.True(TimingInterval - SafetyMargin < ElapsedMs()); - Assert.Equal(123, v); - } - - [Fact] - public void TestBackgroundUpdateSucceedsWithInfiniteTimeoutTimeSpan() - { - var k = new BlockingCell(); - SetAfter(TimingInterval, k, 123); - - ResetTimer(); - TimeSpan infiniteTimeSpan = Timeout.InfiniteTimeSpan; - int v = k.WaitForValue(infiniteTimeSpan); - Assert.True(TimingInterval - SafetyMargin < ElapsedMs()); - Assert.Equal(123, v); - } - - [Fact] - public void TestBackgroundUpdateFails() - { - var k = new BlockingCell(); - SetAfter(TimingInterval_16X, k, 123); - - ResetTimer(); - Assert.Throws(() => k.WaitForValue(TimingInterval)); - } - } -} diff --git a/projects/Test/Unit/TestRpcContinuationQueue.cs b/projects/Test/Unit/TestRpcContinuationQueue.cs index f3fcfaea80..bca0d3c4e4 100644 --- a/projects/Test/Unit/TestRpcContinuationQueue.cs +++ b/projects/Test/Unit/TestRpcContinuationQueue.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using RabbitMQ.Client.client.framing; using RabbitMQ.Client.Impl; using Xunit; @@ -37,11 +38,19 @@ namespace Test.Unit { public class TestRpcContinuationQueue { + private class TestSimpleAsyncRpcContinuation : SimpleAsyncRpcContinuation + { + public TestSimpleAsyncRpcContinuation() + : base(ProtocolCommandId.BasicGet, TimeSpan.FromSeconds(10)) + { + } + } + [Fact] public void TestRpcContinuationQueueEnqueueAndRelease() { RpcContinuationQueue queue = new RpcContinuationQueue(); - var inputContinuation = new SimpleBlockingRpcContinuation(); + var inputContinuation = new TestSimpleAsyncRpcContinuation(); queue.Enqueue(inputContinuation); IRpcContinuation outputContinuation = queue.Next(); Assert.Equal(outputContinuation, inputContinuation); @@ -51,7 +60,7 @@ public void TestRpcContinuationQueueEnqueueAndRelease() public void TestRpcContinuationQueueEnqueueAndRelease2() { RpcContinuationQueue queue = new RpcContinuationQueue(); - var inputContinuation = new SimpleBlockingRpcContinuation(); + var inputContinuation = new TestSimpleAsyncRpcContinuation(); queue.Enqueue(inputContinuation); IRpcContinuation outputContinuation = queue.Next(); Assert.Equal(outputContinuation, inputContinuation); @@ -63,8 +72,8 @@ public void TestRpcContinuationQueueEnqueueAndRelease2() public void TestRpcContinuationQueueEnqueue2() { RpcContinuationQueue queue = new RpcContinuationQueue(); - var inputContinuation = new SimpleBlockingRpcContinuation(); - var inputContinuation1 = new SimpleBlockingRpcContinuation(); + var inputContinuation = new TestSimpleAsyncRpcContinuation(); + var inputContinuation1 = new TestSimpleAsyncRpcContinuation(); queue.Enqueue(inputContinuation); Assert.Throws(() => {