Skip to content

Commit

Permalink
Fix synchronization of semaphore in retry delegating handler (#3135)
Browse files Browse the repository at this point in the history
* Decouple client open-close semaphore from callback subscription semaphore

* Cancel pending operations when CloseAsync() is invoked
  • Loading branch information
abhipsaMisra authored Mar 7, 2023
1 parent 480a27b commit d297180
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 138 deletions.
2 changes: 1 addition & 1 deletion build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ Function BuildPackage($path, $message)
SignDotNetBinary $filesToSign
}

& dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --output $localPackages
& dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --property:PackageOutputPath=$localPackages

if ($LASTEXITCODE -ne 0)
{
Expand Down
1 change: 1 addition & 0 deletions e2e/test/iothub/FileUploadE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public async Task FileUpload_SmallFile_Http_GranularSteps_x509()
[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[TestCategory("LongRunning")]
[TestCategory("Proxy")]
public async Task FileUpload_SmallFile_Http_GranularSteps_Proxy()
{
string filename = await GetTestFileNameAsync(FileSizeSmall).ConfigureAwait(false);
Expand Down
1 change: 1 addition & 0 deletions e2e/test/iothub/service/RegistryManagerE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public async Task RegistryManager_RemoveDevices2Async_Works()

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[TestCategory("Proxy")]
public async Task RegistryManager_AddDeviceWithProxy()
{
string deviceId = _idPrefix + Guid.NewGuid();
Expand Down
5 changes: 3 additions & 2 deletions iothub/device/src/Pipeline/DefaultDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ namespace Microsoft.Azure.Devices.Client.Transport
{
internal abstract class DefaultDelegatingHandler : IDelegatingHandler
{
private volatile IDelegatingHandler _innerHandler;
protected const string ClientDisposedMessage = "The client has been disposed and is no longer usable.";
protected volatile bool _isDisposed;
private volatile IDelegatingHandler _innerHandler;

protected DefaultDelegatingHandler(PipelineContext context, IDelegatingHandler innerHandler)
{
Expand Down Expand Up @@ -209,7 +210,7 @@ protected void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException("IoT hub client");
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}
}

Expand Down
416 changes: 297 additions & 119 deletions iothub/device/src/Pipeline/RetryDelegatingHandler.cs

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions iothub/device/tests/Pipeline/RetryDelegatingHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries()
// arrange
int callCounter = 0;

var ct = CancellationToken.None;
PipelineContext contextMock = Substitute.For<PipelineContext>();
contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
IDelegatingHandler innerHandlerMock = Substitute.For<IDelegatingHandler>();

innerHandlerMock
.OpenAsync(ct)
.OpenAsync(Arg.Any<CancellationToken>())
.Returns(t =>
{
return ++callCounter == 1
Expand All @@ -45,7 +44,7 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries()
var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock);

// act
await retryDelegatingHandler.OpenAsync(ct).ConfigureAwait(false);
await retryDelegatingHandler.OpenAsync(CancellationToken.None).ConfigureAwait(false);

// assert
callCounter.Should().Be(2);
Expand Down Expand Up @@ -255,12 +254,12 @@ public async Task DeviceNotFoundExceptionReturnsDeviceDisabledStatus()
public async Task RetryTransientErrorThrownAfterNumberOfRetriesThrows()
{
// arrange
using var cts = new CancellationTokenSource(100);
using var cts = new CancellationTokenSource(1000);
var contextMock = Substitute.For<PipelineContext>();
contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock
.OpenAsync(cts.Token)
.OpenAsync(Arg.Any<CancellationToken>())
.Returns(t => throw new IotHubException(TestExceptionMessage, isTransient: true));

var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
Expand Down Expand Up @@ -352,12 +351,12 @@ public async Task RetrySetRetryPolicyVerifyInternalsSuccess()
delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { });
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);

var retryPolicy = new TestRetryPolicy();
var retryPolicy = new TestRetryPolicyRetryTwice();
sut.SetRetryPolicy(retryPolicy);

int innerHandlerCallCounter = 0;

innerHandlerMock.OpenAsync(CancellationToken.None).Returns(t =>
innerHandlerMock.OpenAsync(Arg.Any<CancellationToken>()).Returns(t =>
{
innerHandlerCallCounter++;
throw new IotHubCommunicationException();
Expand Down Expand Up @@ -397,7 +396,7 @@ public async Task RetryCancellationTokenCanceledAbandon()
{
// arrange
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock.AbandonAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
innerHandlerMock.AbandonAsync(null, Arg.Any<CancellationToken>()).ReturnsForAnyArgs(TaskHelpers.CompletedTask);

var contextMock = Substitute.For<PipelineContext>();
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
Expand All @@ -416,7 +415,7 @@ public async Task RetryCancellationTokenCanceledReject()
{
// arrange
var innerHandlerMock = Substitute.For<IDelegatingHandler>();
innerHandlerMock.RejectAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask);
innerHandlerMock.RejectAsync(null, Arg.Any<CancellationToken>()).ReturnsForAnyArgs(TaskHelpers.CompletedTask);

var contextMock = Substitute.For<PipelineContext>();
var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock);
Expand All @@ -427,7 +426,7 @@ public async Task RetryCancellationTokenCanceledReject()
await sut.RejectAsync(Arg.Any<string>(), cts.Token).ExpectedAsync<TaskCanceledException>().ConfigureAwait(false);
}

private class TestRetryPolicy : IRetryPolicy
private class TestRetryPolicyRetryTwice : IRetryPolicy
{
public int Counter { get; private set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
Expand All @@ -13,6 +16,7 @@
namespace Microsoft.Azure.Devices.Client.Tests.Amqp
{
[TestClass]
[TestCategory("Unit")]
public class AmqpConnectionPoolTests
{
internal class AmqpConnectionPoolTest : AmqpConnectionPool
Expand Down
6 changes: 1 addition & 5 deletions vsts/vsts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -548,11 +548,7 @@ jobs:
pool:
vmImage: windows-2022
steps:
- script: |
rem Run dotnet first experience.
dotnet new
rem Start build
build.cmd -clean -build -configuration Debug -package
- powershell: .\build.ps1 -clean -build -configutaion Debug -package
displayName: Build Package

- task: ComponentGovernanceComponentDetection@0
Expand Down

0 comments on commit d297180

Please sign in to comment.