Skip to content

Commit

Permalink
Enable Durable Subscriptions (#9)
Browse files Browse the repository at this point in the history
* [Server] Durable Subscriptions (OPCFoundation#2683)

* Implement a system test for DurableSubscriptions (OPCFoundation#2839)
* Client Side
Co-authored-by: ALTERNATE-DEV\Archie <archie@dreamsandadventures.ca>
  • Loading branch information
romanett authored Jan 31, 2025
1 parent cf82847 commit e0a644f
Show file tree
Hide file tree
Showing 50 changed files with 4,994 additions and 862 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/buildandtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Build and Test .NET 9.0
on:
push:
pull_request:
branches: [ master, main ]
branches: [ master, main, develop/* ]
paths:
- '**.cs'
- '**.csproj'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: "CodeQL"

on:
push:
branches: [ master, main, release/* ]
branches: [ master, main, release/*, develop/* ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, main ]
Expand Down
221 changes: 209 additions & 12 deletions Applications/ConsoleReferenceClient/ClientSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
Expand Down Expand Up @@ -69,6 +70,13 @@ public ClientSamples(TextWriter output, Action<IList, IList> validateResponse, M
m_validateResponse = validateResponse ?? ClientBase.ValidateResponse;
m_quitEvent = quitEvent;
m_verbose = verbose;
m_desiredEventFields = new Dictionary<int, QualifiedNameCollection>();
int eventIndexCounter = 0;
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Time }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.ActiveState }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Message }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.LimitState, BrowseNames.CurrentState }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.LimitState, BrowseNames.LastTransition }));
}

#region Public Sample Methods
Expand Down Expand Up @@ -290,34 +298,107 @@ public void CallMethod(ISession session)
}

/// <summary>
/// Create Subscription and MonitoredItems for DataChanges
/// Call the Start method for Alarming to enable events
/// </summary>
public void SubscribeToDataChanges(ISession session, uint minLifeTime)
public void EnableEvents(ISession session, uint timeToRun)
{
if (session == null || session.Connected == false)
{
m_output.WriteLine("Session not connected!");
return;
}

try
{
// Define the UA Method to call
// Parent node - Objects\CTT\Alarms
// Method node - Objects\CTT\Alarms\Start
NodeId objectId = new NodeId("ns=7;s=Alarms");
NodeId methodId = new NodeId("ns=7;s=Alarms.Start");

// Define the method parameters
// Input argument requires a Float and an UInt32 value
object[] inputArguments = new object[] { timeToRun };
IList<object> outputArguments = null;

// Invoke Call service
m_output.WriteLine("Calling UAMethod for node {0} ...", methodId);
outputArguments = session.Call(objectId, methodId, inputArguments);

// Display results
m_output.WriteLine("Method call returned {0} output argument(s):", outputArguments.Count);

foreach (var outputArgument in outputArguments)
{
m_output.WriteLine(" OutputValue = {0}", outputArgument.ToString());
}
}
catch (Exception ex)
{
m_output.WriteLine("Method call error: {0}", ex.Message);
}
}

/// <summary>
/// Create Subscription and MonitoredItems for DataChanges
/// </summary>
public bool SubscribeToDataChanges(ISession session, uint minLifeTime, bool enableDurableSubscriptions)
{
bool isDurable = false;

if (session == null || session.Connected == false)
{
m_output.WriteLine("Session not connected!");
return isDurable;
}

try
{
// Create a subscription for receiving data change notifications
int subscriptionPublishingInterval = 1000;
int itemSamplingInterval = 1000;
uint queueSize = 10;
uint lifetime = minLifeTime;

if (enableDurableSubscriptions)
{
queueSize = 100;
lifetime = 20;
}

// Define Subscription parameters
Subscription subscription = new Subscription(session.DefaultSubscription) {
DisplayName = "Console ReferenceClient Subscription",
PublishingEnabled = true,
PublishingInterval = 1000,
PublishingInterval = subscriptionPublishingInterval,
LifetimeCount = 0,
MinLifetimeInterval = minLifeTime,
MinLifetimeInterval = lifetime,
KeepAliveCount = 5,
};

session.AddSubscription(subscription);

// Create the subscription on Server side
subscription.Create();
m_output.WriteLine("New Subscription created with SubscriptionId = {0}.", subscription.Id);
m_output.WriteLine("New Subscription created with SubscriptionId = {0}, Sampling Interval {1}, Publishing Interval {2}.",
subscription.Id, itemSamplingInterval, subscriptionPublishingInterval);

if (enableDurableSubscriptions)
{
uint revisedLifetimeInHours = 0;

if (subscription.SetSubscriptionDurable(1, out revisedLifetimeInHours))
{
isDurable = true;

m_output.WriteLine("Subscription {0} is now durable, Revised Lifetime {1} in hours.",
subscription.Id, revisedLifetimeInHours);
}
else
{
m_output.WriteLine("Subscription {0} failed durable call", subscription.Id);
}
}

// Create MonitoredItems for data changes (Reference Server)

Expand All @@ -326,8 +407,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
intMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_Int32");
intMonitoredItem.AttributeId = Attributes.Value;
intMonitoredItem.DisplayName = "Int32 Variable";
intMonitoredItem.SamplingInterval = 1000;
intMonitoredItem.QueueSize = 10;
intMonitoredItem.SamplingInterval = itemSamplingInterval;
intMonitoredItem.QueueSize = queueSize;
intMonitoredItem.DiscardOldest = true;
intMonitoredItem.Notification += OnMonitoredItemNotification;

Expand All @@ -338,8 +419,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
floatMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_Float");
floatMonitoredItem.AttributeId = Attributes.Value;
floatMonitoredItem.DisplayName = "Float Variable";
floatMonitoredItem.SamplingInterval = 1000;
floatMonitoredItem.QueueSize = 10;
floatMonitoredItem.SamplingInterval = itemSamplingInterval;
floatMonitoredItem.QueueSize = queueSize;
floatMonitoredItem.Notification += OnMonitoredItemNotification;

subscription.AddItem(floatMonitoredItem);
Expand All @@ -349,12 +430,54 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
stringMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_String");
stringMonitoredItem.AttributeId = Attributes.Value;
stringMonitoredItem.DisplayName = "String Variable";
stringMonitoredItem.SamplingInterval = 1000;
stringMonitoredItem.QueueSize = 10;
stringMonitoredItem.SamplingInterval = itemSamplingInterval;
stringMonitoredItem.QueueSize = queueSize;
stringMonitoredItem.Notification += OnMonitoredItemNotification;

subscription.AddItem(stringMonitoredItem);

MonitoredItem eventMonitoredItem = new MonitoredItem(subscription.DefaultItem);
eventMonitoredItem.StartNodeId = new NodeId(Opc.Ua.ObjectIds.Server);
eventMonitoredItem.AttributeId = Attributes.EventNotifier;
eventMonitoredItem.DisplayName = "Event Variable";
eventMonitoredItem.SamplingInterval = itemSamplingInterval;
eventMonitoredItem.QueueSize = queueSize;
eventMonitoredItem.Notification += OnMonitoredItemEventNotification;

EventFilter filter = new EventFilter();

SimpleAttributeOperandCollection simpleAttributeOperands = new SimpleAttributeOperandCollection();

foreach (QualifiedNameCollection desiredEventField in m_desiredEventFields.Values)
{
simpleAttributeOperands.Add(new SimpleAttributeOperand() {
AttributeId = Attributes.Value,
TypeDefinitionId = ObjectTypeIds.BaseEventType,
BrowsePath = desiredEventField
});
}
filter.SelectClauses = simpleAttributeOperands;

ContentFilter whereClause = new ContentFilter();
SimpleAttributeOperand existingEventType = new SimpleAttributeOperand() {
AttributeId = Attributes.Value,
TypeDefinitionId = ObjectTypeIds.ExclusiveLevelAlarmType,
BrowsePath = new QualifiedNameCollection(new QualifiedName[] { "EventType" })
};
LiteralOperand desiredEventType = new LiteralOperand();
desiredEventType.Value = new Variant(new NodeId(Opc.Ua.ObjectTypeIds.ExclusiveLevelAlarmType));


whereClause.Push(FilterOperator.Equals, new FilterOperand[] { existingEventType, desiredEventType });

filter.WhereClause = whereClause;

eventMonitoredItem.Filter = filter;
eventMonitoredItem.NodeClass = NodeClass.Object;


subscription.AddItem(eventMonitoredItem);

// Create the monitored items on Server side
subscription.ApplyChanges();
m_output.WriteLine("MonitoredItems created for SubscriptionId = {0}.", subscription.Id);
Expand All @@ -363,6 +486,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
{
m_output.WriteLine("Subscribe error: {0}", ex.Message);
}

return isDurable;
}
#endregion

Expand Down Expand Up @@ -1186,14 +1311,83 @@ private void OnMonitoredItemNotification(MonitoredItem monitoredItem, MonitoredI
{
// Log MonitoredItem Notification event
MonitoredItemNotification notification = e.NotificationValue as MonitoredItemNotification;
m_output.WriteLine("Notification: {0} \"{1}\" and Value = {2}.", notification.Message.SequenceNumber, monitoredItem.ResolvedNodeId, notification.Value);
DateTime localTime = notification.Value.SourceTimestamp.ToLocalTime();
m_output.WriteLine("Notification: {0} \"{1}\" and Value = {2} at [{3}].",
notification.Message.SequenceNumber,
monitoredItem.ResolvedNodeId,
notification.Value,
localTime.ToLongTimeString());
}
catch (Exception ex)
{
m_output.WriteLine("OnMonitoredItemNotification error: {0}", ex.Message);
}
}

/// <summary>
/// Handle Requested Event notifications from Server
/// </summary>
private void OnMonitoredItemEventNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
{
try
{
// Log MonitoredItem Notification event
EventFieldList notification = e.NotificationValue as EventFieldList;

foreach (KeyValuePair<int, QualifiedNameCollection> entry in m_desiredEventFields)
{
Variant field = notification.EventFields[entry.Key];
if (field.TypeInfo.BuiltInType != BuiltInType.Null)
{
StringBuilder fieldPath = new StringBuilder();

int lastIndex = entry.Value.Count - 1;
for (int index = 0; index < entry.Value.Count; index++)
{
fieldPath.Append(entry.Value[index].Name);
if (index < lastIndex)
{
fieldPath.Append(".");
}
}

string fieldName = fieldPath.ToString();
if (fieldName.Equals("Time"))
{
try
{
DateTime currentTime = (DateTime)field.Value;
TimeSpan timeSpan = currentTime - m_lastEventTime;
m_lastEventTime = currentTime;
m_processedEvents++;
string timeBetweenEvents = "";
if (m_processedEvents > 1)
{
timeBetweenEvents = ", time since last event = " + timeSpan.Seconds.ToString() + " seconds";
}

m_output.WriteLine("Event Received - total count = {0}{1}",
m_processedEvents.ToString(),
timeBetweenEvents);
}
catch (Exception ex)
{
m_output.WriteLine("Unexpected error retrieving Event Time Field Value: {0}", ex.Message);
}
}

m_output.WriteLine("\tField [{0}] \"{1}\" = [{2}]",
entry.Key.ToString(), fieldName, field.Value);
}
}
}
catch (Exception ex)
{
m_output.WriteLine("OnMonitoredItemEventNotification error: {0}", ex.Message);
}
}


/// <summary>
/// Event handler to defer publish response sequence number acknowledge.
/// </summary>
Expand Down Expand Up @@ -1256,5 +1450,8 @@ private static ByteStringCollection PrepareBrowseNext(BrowseResultCollection bro
private readonly TextWriter m_output;
private readonly ManualResetEvent m_quitEvent;
private readonly bool m_verbose;
private Dictionary<int, QualifiedNameCollection> m_desiredEventFields = null;
private int m_processedEvents = 0;
private DateTime m_lastEventTime = DateTime.Now;
}
}
Loading

0 comments on commit e0a644f

Please sign in to comment.