Skip to content

Commit

Permalink
fix:(device): Select correct AMQP link for a Module Twin v. and Edge …
Browse files Browse the repository at this point in the history
…Device (#2219)

* Select correct AMQP link for a Module Twin v. and Edge Device.

* Updating unit tests and removing additional projects for testing

* Remove safe close sematics to keep client the same.

* Remove underscore

* Restore AmqpUnit completely, no changes needed here

* Add note to IDelegatingHandler

* Update iothub/device/src/IDelegatingHandler.cs

Co-authored-by: David R. Williamson <drwill@microsoft.com>

* Update iothub/device/src/IDelegatingHandler.cs

Co-authored-by: David R. Williamson <drwill@microsoft.com>

* Update iothub/device/src/ModuleClient.cs

Co-authored-by: David R. Williamson <drwill@microsoft.com>

* Update iothub/device/src/ModuleClient.cs

Co-authored-by: David R. Williamson <drwill@microsoft.com>

* Feedback

* Adding assemblyinfo

* Fixing isAnEdgeModule in comments

* Remove new lines

* Style fixes

Co-authored-by: David R. Williamson <drwill@microsoft.com>

* Style changes

* Move all edge module tests and turn of parallel execution

* Make IotHubConnectionString dictate if the ModuleClient is using a gateway

* remove extra comment

* remove line break

* Fixing order of builder

* Adding disable handler back

Co-authored-by: David R. Williamson <drwill@microsoft.com>
  • Loading branch information
jamdavi and David R. Williamson authored Nov 1, 2021
1 parent a609daa commit 60eaa3b
Show file tree
Hide file tree
Showing 16 changed files with 526 additions and 103 deletions.
13 changes: 9 additions & 4 deletions iothub/device/src/IDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ internal interface IDelegatingHandler : IContinuationProvider<IDelegatingHandler

Task CompleteAsync(string lockToken, CancellationToken cancellationToken);

// Telemetry downlink for modules.
Task EnableEventReceiveAsync(CancellationToken cancellationToken);

Task DisableEventReceiveAsync(CancellationToken cancellationToken);
// Edge Modules and Module Twins have different links to be used for the same function when communicating over AMQP
// We are setting the flag on these methods since the decision should be made at the transport layer and not at the
// client layer.
//
// This means that all other transports will need to implement this method. However they do not need to use the flag
// if there is no behavior change required.
Task EnableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken);

Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken);

// Methods.
Task EnableMethodsAsync(CancellationToken cancellationToken);
Expand Down
35 changes: 20 additions & 15 deletions iothub/device/src/InternalClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1651,14 +1651,15 @@ public Task SendEventBatchAsync(string outputName, IEnumerable<Message> messages
/// <param name="inputName">The name of the input to associate with the delegate.</param>
/// <param name="messageHandler">The delegate to be used when a message is sent to the particular inputName.</param>
/// <param name="userContext">generic parameter to be interpreted by the client code.</param>
/// <param name="isAnEdgeModule">Parameter to correctly select a device module path. This is set by the <see cref="ModuleClient"/> when a <see cref="Edge.EdgeModuleClientFactory"/> creates the module.</param>
/// <returns>The task containing the event</returns>
public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext)
public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext, bool isAnEdgeModule)
{
try
{
// Codes_SRS_DEVICECLIENT_28_013: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs.]
using CancellationTokenSource cts = CancellationTokenSourceFactory();
await SetInputMessageHandlerAsync(inputName, messageHandler, userContext, cts.Token).ConfigureAwait(false);
await SetInputMessageHandlerAsync(inputName, messageHandler, userContext, isAnEdgeModule, cts.Token).ConfigureAwait(false);
}
catch (Exception ex) when (IsCausedByTimeoutOrCanncellation(ex))
{
Expand All @@ -1675,9 +1676,10 @@ public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler m
/// <param name="inputName">The name of the input to associate with the delegate.</param>
/// <param name="messageHandler">The delegate to be used when a message is sent to the particular inputName.</param>
/// <param name="userContext">generic parameter to be interpreted by the client code.</param>
/// <param name="isAnEdgeModule">Parameter to correctly select a device module path. This is set by the <see cref="ModuleClient"/> when a <see cref="Edge.EdgeModuleClientFactory"/> creates the module.</param>
/// <param name="cancellationToken"></param>
/// <returns>The task containing the event</returns>
public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext, CancellationToken cancellationToken)
public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext, bool isAnEdgeModule, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
{
Expand All @@ -1693,8 +1695,9 @@ public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler m
{
if (messageHandler != null)
{
// codes_SRS_DEVICECLIENT_33_003: [ It shall EnableEventReceiveAsync when called for the first time. ]
await EnableEventReceiveAsync(cancellationToken).ConfigureAwait(false);
// When using a device module we need to enable the 'deviceBound' message link
await EnableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);

// codes_SRS_DEVICECLIENT_33_005: [ It shall lazy-initialize the receiveEventEndpoints property. ]
if (_receiveEventEndpoints == null)
{
Expand All @@ -1715,7 +1718,7 @@ public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler m
}

// codes_SRS_DEVICECLIENT_33_004: [ It shall call DisableEventReceiveAsync when the last delegate has been removed. ]
await DisableEventReceiveAsync(cancellationToken).ConfigureAwait(false);
await DisableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);
}
}
finally
Expand All @@ -1737,14 +1740,15 @@ public async Task SetInputMessageHandlerAsync(string inputName, MessageHandler m
/// </summary>
/// <param name="messageHandler">The delegate to be called when a message is sent to any input.</param>
/// <param name="userContext">generic parameter to be interpreted by the client code.</param>
/// <param name="isAnEdgeModule">Parameter to correctly select a device module path. This is set by the <see cref="ModuleClient"/> when a <see cref="Edge.EdgeModuleClientFactory"/> creates the module.</param>
/// <returns>The task containing the event</returns>
public async Task SetMessageHandlerAsync(MessageHandler messageHandler, object userContext)
public async Task SetMessageHandlerAsync(MessageHandler messageHandler, object userContext, bool isAnEdgeModule)
{
try
{
// Codes_SRS_DEVICECLIENT_28_013: [The asynchronous operation shall retry until time specified in OperationTimeoutInMilliseconds property expire or unrecoverable error(authentication, quota exceed) occurs.]
using CancellationTokenSource cts = CancellationTokenSourceFactory();
await SetMessageHandlerAsync(messageHandler, userContext, cts.Token).ConfigureAwait(false);
await SetMessageHandlerAsync(messageHandler, userContext, isAnEdgeModule, cts.Token).ConfigureAwait(false);
}
catch (Exception ex) when (IsCausedByTimeoutOrCanncellation(ex))
{
Expand All @@ -1761,9 +1765,10 @@ public async Task SetMessageHandlerAsync(MessageHandler messageHandler, object u
/// </summary>
/// <param name="messageHandler">The delegate to be called when a message is sent to any input.</param>
/// <param name="userContext">generic parameter to be interpreted by the client code.</param>
/// <param name="isAnEdgeModule">Parameter to correctly select a device module path. This is set by the <see cref="ModuleClient"/> when a <see cref="Edge.EdgeModuleClientFactory"/> creates the module.</param>
/// <param name="cancellationToken"></param>
/// <returns>The task containing the event</returns>
public async Task SetMessageHandlerAsync(MessageHandler messageHandler, object userContext, CancellationToken cancellationToken)
public async Task SetMessageHandlerAsync(MessageHandler messageHandler, object userContext, bool isAnEdgeModule, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
{
Expand All @@ -1778,14 +1783,14 @@ public async Task SetMessageHandlerAsync(MessageHandler messageHandler, object u
if (messageHandler != null)
{
// codes_SRS_DEVICECLIENT_33_003: [ It shall EnableEventReceiveAsync when called for the first time. ]
await EnableEventReceiveAsync(cancellationToken).ConfigureAwait(false);
await EnableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);
_defaultEventCallback = new Tuple<MessageHandler, object>(messageHandler, userContext);
}
else
{
_defaultEventCallback = null;
// codes_SRS_DEVICECLIENT_33_004: [ It shall DisableEventReceiveAsync when the last delegate has been removed. ]
await DisableEventReceiveAsync(cancellationToken).ConfigureAwait(false);
await DisableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);
}
}
finally
Expand Down Expand Up @@ -1867,20 +1872,20 @@ internal async Task OnModuleEventMessageReceivedAsync(string input, Message mess
}

// Enable telemetry downlink for modules
private Task EnableEventReceiveAsync(CancellationToken cancellationToken)
private Task EnableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
{
// The telemetry downlink needs to be enabled only for the first time that the _defaultEventCallback delegate is set.
return _receiveEventEndpoints == null && _defaultEventCallback == null
? InnerHandler.EnableEventReceiveAsync(cancellationToken)
? InnerHandler.EnableEventReceiveAsync(isAnEdgeModule, cancellationToken)
: TaskHelpers.CompletedTask;
}

// Disable telemetry downlink for modules
private Task DisableEventReceiveAsync(CancellationToken cancellationToken)
private Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
{
// The telemetry downlink should be disabled only after _defaultEventCallback delegate has been removed.
return _receiveEventEndpoints == null && _defaultEventCallback == null
? InnerHandler.DisableEventReceiveAsync(cancellationToken)
? InnerHandler.DisableEventReceiveAsync(isAnEdgeModule, cancellationToken)
: TaskHelpers.CompletedTask;
}

Expand Down
9 changes: 6 additions & 3 deletions iothub/device/src/IotHubConnectionString.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ public IotHubConnectionString(IotHubConnectionStringBuilder builder)
}

Audience = builder.HostName;
HostName = string.IsNullOrEmpty(builder.GatewayHostName)
? builder.HostName
: builder.GatewayHostName;
IsUsingGateway = !string.IsNullOrEmpty(builder.GatewayHostName);
HostName = IsUsingGateway
? builder.GatewayHostName
: builder.HostName;
SharedAccessKeyName = builder.SharedAccessKeyName;
SharedAccessKey = builder.SharedAccessKey;
IotHubName = builder.IotHubName;
Expand Down Expand Up @@ -110,5 +111,7 @@ public IotHubConnectionString(IotHubConnectionStringBuilder builder)
public string SharedAccessKey { get; private set; }

public string SharedAccessSignature { get; private set; }

public bool IsUsingGateway { get; private set; }
}
}
23 changes: 19 additions & 4 deletions iothub/device/src/ModuleClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@ public class ModuleClient : IDisposable
{
private const string ModuleMethodUriFormat = "/twins/{0}/modules/{1}/methods?" + ClientApiVersionHelper.ApiVersionQueryStringLatest;
private const string DeviceMethodUriFormat = "/twins/{0}/methods?" + ClientApiVersionHelper.ApiVersionQueryStringLatest;
private bool _isAnEdgeModule;
private readonly ICertificateValidator _certValidator;

internal InternalClient InternalClient { get; private set; }

/// <summary>
/// Constructor for a module client to be created from an <see cref="InternalClient"/>.
/// </summary>
/// <param name="internalClient">The internal client to use for the commands.</param>
internal ModuleClient(InternalClient internalClient) : this(internalClient, NullCertificateValidator.Instance)
{
}

/// <summary>
/// Constructor for a module client to be created from an <see cref="InternalClient"/>. With a specific certificate validator.
/// </summary>
/// <param name="internalClient">The internal client to use for the commands.</param>
/// <param name="certValidator">The custom certificate validator to use for connection.</param>
internal ModuleClient(InternalClient internalClient, ICertificateValidator certValidator)
{
InternalClient = internalClient ?? throw new ArgumentNullException(nameof(internalClient));
Expand All @@ -52,6 +62,11 @@ internal ModuleClient(InternalClient internalClient, ICertificateValidator certV
throw new ArgumentException("A valid module Id should be specified to create a ModuleClient");
}

// There is a distinction between a Module Twin and and Edge module. We set this flag in order
// to correctly select the reciver link for AMQP on a Module Twin. This does not affect MQTT.
// We can determine that this is an edge module if the connection string is using a gateway host.
_isAnEdgeModule = internalClient.IotHubConnectionString.IsUsingGateway;

if (Logging.IsEnabled)
Logging.Associate(this, this, internalClient, nameof(ModuleClient));
}
Expand Down Expand Up @@ -678,7 +693,7 @@ public Task SendEventBatchAsync(string outputName, IEnumerable<Message> messages
/// <exception cref="OperationCanceledException">Thrown when the operation has been canceled.</exception>
/// <returns>The task containing the event</returns>
public Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext) =>
InternalClient.SetInputMessageHandlerAsync(inputName, messageHandler, userContext);
InternalClient.SetInputMessageHandlerAsync(inputName, messageHandler, userContext, _isAnEdgeModule);

/// <summary>
/// Sets a new delegate for the particular input. If a delegate is already associated with
Expand All @@ -691,7 +706,7 @@ public Task SetInputMessageHandlerAsync(string inputName, MessageHandler message
/// <exception cref="OperationCanceledException">Thrown when the operation has been canceled.</exception>
/// <returns>The task containing the event</returns>
public Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext, CancellationToken cancellationToken) =>
InternalClient.SetInputMessageHandlerAsync(inputName, messageHandler, userContext, cancellationToken);
InternalClient.SetInputMessageHandlerAsync(inputName, messageHandler, userContext, _isAnEdgeModule, cancellationToken);

/// <summary>
/// Sets a new default delegate which applies to all endpoints. If a delegate is already associated with
Expand All @@ -703,7 +718,7 @@ public Task SetInputMessageHandlerAsync(string inputName, MessageHandler message
/// <exception cref="OperationCanceledException">Thrown when the operation has been canceled.</exception>
/// <returns>The task containing the event</returns>
public Task SetMessageHandlerAsync(MessageHandler messageHandler, object userContext) =>
InternalClient.SetMessageHandlerAsync(messageHandler, userContext);
InternalClient.SetMessageHandlerAsync(messageHandler, userContext, _isAnEdgeModule);

/// <summary>
/// Sets a new default delegate which applies to all endpoints. If a delegate is already associated with
Expand All @@ -716,7 +731,7 @@ public Task SetMessageHandlerAsync(MessageHandler messageHandler, object userCon
/// <exception cref="OperationCanceledException">Thrown when the operation has been canceled.</exception>
/// <returns>The task containing the event</returns>
public Task SetMessageHandlerAsync(MessageHandler messageHandler, object userContext, CancellationToken cancellationToken) =>
InternalClient.SetMessageHandlerAsync(messageHandler, userContext, cancellationToken);
InternalClient.SetMessageHandlerAsync(messageHandler, userContext, _isAnEdgeModule, cancellationToken);

/// <summary>
/// Interactively invokes a method from an edge module to an edge device.
Expand Down
30 changes: 20 additions & 10 deletions iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class AmqpTransportHandler : TransportHandler

private const int ResponseTimeoutInSeconds = 300;
private readonly TimeSpan _operationTimeout;
private readonly AmqpUnit _amqpUnit;
protected AmqpUnit _amqpUnit;
private readonly Action<TwinCollection> _onDesiredStatePatchListener;
private readonly object _lock = new object();
private ConcurrentDictionary<string, TaskCompletionSource<Twin>> _twinResponseCompletions = new ConcurrentDictionary<string, TaskCompletionSource<Twin>>();
Expand Down Expand Up @@ -432,20 +432,30 @@ private async Task<Twin> RoundTripTwinMessageAsync(

#region Events

public override async Task EnableEventReceiveAsync(CancellationToken cancellationToken)
public override async Task EnableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
{
Logging.Enter(this, cancellationToken, nameof(EnableEventReceiveAsync));

try
// If an AMQP transport is opened as a module twin instead of an Edge module we need
// to enable the deviceBound operations instead of the event receiver link
if (isAnEdgeModule)
{
cancellationToken.ThrowIfCancellationRequested();
Logging.Enter(this, cancellationToken, nameof(EnableEventReceiveAsync));

await _amqpUnit.EnableEventReceiveAsync(_operationTimeout).ConfigureAwait(false);
}
finally
try
{
cancellationToken.ThrowIfCancellationRequested();

await _amqpUnit.EnableEventReceiveAsync(_operationTimeout).ConfigureAwait(false);
}
finally
{
Logging.Exit(this, cancellationToken, nameof(EnableEventReceiveAsync));
}
}
else
{
Logging.Exit(this, cancellationToken, nameof(EnableEventReceiveAsync));
await EnableReceiveMessageAsync(cancellationToken).ConfigureAwait(false);
}

}

#endregion Events
Expand Down
8 changes: 4 additions & 4 deletions iothub/device/src/Transport/DefaultDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,16 @@ public virtual Task SendTwinPatchAsync(TwinCollection reportedProperties, Cancel
return InnerHandler?.SendTwinPatchAsync(reportedProperties, cancellationToken) ?? TaskHelpers.CompletedTask;
}

public virtual Task EnableEventReceiveAsync(CancellationToken cancellationToken)
public virtual Task EnableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return InnerHandler?.EnableEventReceiveAsync(cancellationToken) ?? TaskHelpers.CompletedTask;
return InnerHandler?.EnableEventReceiveAsync(isAnEdgeModule, cancellationToken) ?? TaskHelpers.CompletedTask;
}

public virtual Task DisableEventReceiveAsync(CancellationToken cancellationToken)
public virtual Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return InnerHandler?.DisableEventReceiveAsync(cancellationToken) ?? TaskHelpers.CompletedTask;
return InnerHandler?.DisableEventReceiveAsync(isAnEdgeModule, cancellationToken) ?? TaskHelpers.CompletedTask;
}

public virtual bool IsUsable => InnerHandler?.IsUsable ?? true;
Expand Down
Loading

0 comments on commit 60eaa3b

Please sign in to comment.