Skip to content

Commit

Permalink
Fix reconnect/reset connection API (#2305)
Browse files Browse the repository at this point in the history
Fix reconnect/reset connection API (#2291) and swagger update.
  • Loading branch information
marcschier authored Jul 29, 2024
1 parent e8b60aa commit 3f8a26e
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 21 deletions.
4 changes: 2 additions & 2 deletions docs/opc-publisher/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,8 @@ Get diagnostics for all active clients including server and client session diagn
* `application/x-msgpack`


<a name="resetallclients"></a>
#### ResetAllClients
<a name="resetallconnections"></a>
#### ResetAllConnections
```
GET /v2/reset
```
Expand Down
4 changes: 2 additions & 2 deletions docs/opc-publisher/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1140,9 +1140,9 @@
"tags": [
"Diagnostics"
],
"summary": "ResetAllClients",
"summary": "ResetAllConnections",
"description": "Can be used to reset all established connections causing a full reconnect and recreate of all subscriptions.",
"operationId": "ResetAllClients",
"operationId": "ResetAllConnections",
"produces": [
"application/json",
"application/x-msgpack"
Expand Down
4 changes: 4 additions & 0 deletions docs/web-api/definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,8 @@ Contains the nodes which should be published
|**LastChangeDateTime** <br>*optional*|Last change to the entry|string (date-time)|
|**MaxKeepAliveCount** <br>*optional*|When the publishing timer has expired this number of<br>times without requiring any Notification to be sent,<br>to the writer a keep-alive message is sent.|integer (int64)|
|**MessageEncoding** <br>*optional*||[MessageEncoding](definitions.md#messageencoding)|
|**MessageRetention** <br>*optional*|Message retention setting for messages sent by<br>the writer if the transport supports it.|boolean|
|**MessageTtlTimespan** <br>*optional*|Message time to live for messages sent by the<br>writer if the transport supports it.|string (date-span)|
|**MessagingMode** <br>*optional*||[MessagingMode](definitions.md#messagingmode)|
|**MetaDataQueueName** <br>*optional*|Meta data queue name to use for the writer. Overrides<br>the default metadata topic template.|string|
|**MetaDataUpdateTime** <br>*optional*|Send metadata at the configured interval<br>even when not changing expressed in milliseconds.|integer (int32)|
Expand All @@ -1530,6 +1532,8 @@ Contains the nodes which should be published
|**UseReverseConnect** <br>*optional*|Use reverse connect to connect ot the endpoint|boolean|
|**UseSecurity** <br>*optional*|Secure transport should be used to connect to<br>the opc server.|boolean|
|**Version** <br>*optional*|Version number of the entry|integer (int64)|
|**WriterGroupMessageRetention** <br>*optional*|Default message retention setting for messages sent<br>through the writer group if the transport supports it.|boolean|
|**WriterGroupMessageTtlTimepan** <br>*optional*|Default time to live for messages sent through<br>the writer group if the transport supports it.|string (date-span)|
|**WriterGroupPartitions** <br>*optional*|Number of partitions to split the writer group into<br>when publishing to target topics.|integer (int32)|
|**WriterGroupQualityOfService** <br>*optional*||[QoS](definitions.md#qos)|
|**WriterGroupQueueName** <br>*optional*|Writer group queue overrides the default writer group<br>topic template to use.|string|
Expand Down
18 changes: 18 additions & 0 deletions docs/web-api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -8243,6 +8243,24 @@
"description": "Whether to fetch the display name and use it as\r\ndata set id for all opc node items in the data set",
"type": "boolean"
},
"WriterGroupMessageTtlTimepan": {
"format": "date-span",
"description": "Default time to live for messages sent through\r\nthe writer group if the transport supports it.",
"type": "string"
},
"WriterGroupMessageRetention": {
"description": "Default message retention setting for messages sent\r\nthrough the writer group if the transport supports it.",
"type": "boolean"
},
"MessageTtlTimespan": {
"format": "date-span",
"description": "Message time to live for messages sent by the\r\nwriter if the transport supports it.",
"type": "string"
},
"MessageRetention": {
"description": "Message retention setting for messages sent by\r\nthe writer if the transport supports it.",
"type": "boolean"
},
"NodeId": {
"$ref": "#/definitions/NodeIdModel"
}
Expand Down
6 changes: 6 additions & 0 deletions samples/IoTHub/IoTHubSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ApproveRejected", "ApproveR
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ManageWriter", "ManageWriter\ManageWriter.csproj", "{E143AD3D-AE31-4AAE-AEE5-1AA48D1EC6BA}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GetAndResetConnections", "ResetClients\GetAndResetConnections.csproj", "{4268F412-F656-48B6-98A8-35E88478D376}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -57,6 +59,10 @@ Global
{E143AD3D-AE31-4AAE-AEE5-1AA48D1EC6BA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E143AD3D-AE31-4AAE-AEE5-1AA48D1EC6BA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E143AD3D-AE31-4AAE-AEE5-1AA48D1EC6BA}.Release|Any CPU.Build.0 = Release|Any CPU
{4268F412-F656-48B6-98A8-35E88478D376}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4268F412-F656-48B6-98A8-35E88478D376}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4268F412-F656-48B6-98A8-35E88478D376}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4268F412-F656-48B6-98A8-35E88478D376}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
41 changes: 41 additions & 0 deletions samples/IoTHub/ResetClients/GetAndResetConnections.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------

using CommandLine;
using InvokeDeviceMethod;
using Microsoft.Azure.Devices;
using System.Net;
using System.Text.Json;

Parameters? parameters = null;
ParserResult<Parameters> result = Parser.Default.ParseArguments<Parameters>(args)
.WithParsed(parsedParams => parameters = parsedParams)
.WithNotParsed(errors => Environment.Exit(1));

// This sample accepts the service connection string as a parameter, if present.
Parameters.ValidateConnectionStrings(parameters?.IoTHubOwnerConnectionString, parameters?.EdgeHubConnectionString,
out var deviceId, out var moduleId);

// Create a ServiceClient to communicate with service-facing endpoint on your hub.
using var serviceClient = ServiceClient.CreateFromConnectionString(parameters!.IoTHubOwnerConnectionString);

Console.WriteLine("Active connections:");
var methodInvocation = new CloudToDeviceMethod("GetActiveConnections_V2")
{
ResponseTimeout = TimeSpan.FromSeconds(30),
};
var response = await serviceClient.InvokeDeviceMethodAsync(deviceId, moduleId,
methodInvocation).ConfigureAwait(false);
var connections = JsonSerializer.Serialize(JsonSerializer.Deserialize<JsonElement>(response.GetPayloadAsJson()), Parameters.Indented);
Console.WriteLine(connections);

Console.WriteLine("Resetting all connections...");
methodInvocation = new CloudToDeviceMethod("ResetAllConnections_V2")
{
ResponseTimeout = TimeSpan.FromSeconds(30),
};
response = await serviceClient.InvokeDeviceMethodAsync(deviceId, moduleId,
methodInvocation).ConfigureAwait(false);
Console.WriteLine(response.Status);
15 changes: 15 additions & 0 deletions samples/IoTHub/ResetClients/GetAndResetConnections.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\Parameters.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices" Version="1.39.1" />
<PackageReference Include="CommandLineParser" Version="2.9.1" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public DiagnosticsController(IClientDiagnostics diagnostics)
}

/// <summary>
/// ResetAllClients
/// ResetAllConnections
/// </summary>
/// <remarks>
/// Can be used to reset all established connections causing a full
Expand All @@ -68,7 +68,7 @@ public DiagnosticsController(IClientDiagnostics diagnostics)
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status408RequestTimeout)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status500InternalServerError)]
[HttpGet("reset")]
public async Task ResetAllClientsAsync(CancellationToken ct = default)
public async Task ResetAllConnectionsAsync(CancellationToken ct = default)
{
await _diagnostics.ResetAllConnectionsAsync(ct).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public static void AddPublisherServices(this ContainerBuilder builder)
.AsImplementedInterfaces();
builder.RegisterType<CertificatesController>()
.AsImplementedInterfaces();
builder.RegisterType<DiagnosticsController>()
.AsImplementedInterfaces();
}

/// <summary>
Expand Down
32 changes: 17 additions & 15 deletions src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -746,19 +746,20 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct)
switch (trigger)
{
case ConnectionEvent.Reset:
// If currently reconnecting, dispose the reconnect handler and stop timer
if (currentSessionState != SessionState.Connected)
{
(context as TaskCompletionSource)?.TrySetResult();
break;
}
// If currently reconnecting, dispose the reconnect handler
_reconnectHandler.CancelReconnect();
NotifyConnectivityStateChange(EndpointConnectivityState.Disconnected);
currentSubscriptions.ForEach(h => h.NotifySessionConnectionState(true));

// Clean up
await CloseSessionAsync().ConfigureAwait(false);
Debug.Assert(_session == null);

currentSessionState = SessionState.Disconnected;
(context as TaskCompletionSource)?.TrySetResult();

goto case ConnectionEvent.Connect;
//
// Close bypassing everything but keep channel open then trigger a
// reconnect. The reconnect will recreate the session and subscriptions
//
Debug.Assert(_session != null);
await _session.CloseAsync(false, default).ConfigureAwait(false);
goto case ConnectionEvent.StartReconnect;
case ConnectionEvent.Connect:
if (currentSessionState == SessionState.Disconnected)
{
Expand Down Expand Up @@ -792,6 +793,7 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct)
Debug.Assert(_session != null);

// Allow access to session now
Debug.Assert(_disconnectLock != null);
_disconnectLock.Dispose();
_disconnectLock = null;

Expand Down Expand Up @@ -852,9 +854,8 @@ await ApplySubscriptionAsync(new[] { item }, queuedSubscriptions,
Debug.Assert(_disconnectLock == null);
_disconnectLock = await _lock.WriterLockAsync(ct);

_logger.LogInformation(
"{Client}: Reconnecting session {Session} due to error {Error}...",
this, _sessionName, context as ServiceResult);
_logger.LogInformation("{Client}: Reconnecting session {Session} due to {Reason}...",
this, _sessionName, (context is ServiceResult sr) ? "error " + sr : "RESET");
var state = _reconnectHandler.BeginReconnect(_session,
_reverseConnectManager, GetMinReconnectPeriod(), (sender, evt) =>
{
Expand All @@ -873,6 +874,7 @@ await ApplySubscriptionAsync(new[] { item }, queuedSubscriptions,
currentSessionState = SessionState.Reconnecting;
_reconnectingSession?.SubscriptionHandles
.ForEach(h => h.NotifySessionConnectionState(true));
(context as TaskCompletionSource)?.TrySetResult();
break;
case SessionState.Connecting:
case SessionState.Disconnected:
Expand Down

0 comments on commit 3f8a26e

Please sign in to comment.