- Overview
- Prerequisites
- Create Sandbox Resource Group
- Azure IoT Hub Instance
- Simulated IoT Devices
- Azure Digital Twins Instance
- Azure TwinSync Functions
- Plug-and-Play IoT Devices
- AnyLogic Simulation
- Python Simulation
- Databricks Simulation
- Microsoft Bonsai Teaching
In this tutorial, you will create a sandbox environment in which you can further explore the architectural components of a specific Azure IoT architecture aimed at teaching AI through digital twin simulation. You will be completing the following tasks:
- Create an Azure IoT Hub
- Use Azure IoT Explorer to register IoT devices
- Create Simulated IoT Devices
- Create an Azure Digital Twins instance
- Learn to use the Manufacturing Ontology to create digital twins for a typical Manufacturer
- Use Azure Functions to synchronise data between components
- Auto-Provision Simulated Plug-and-Play IoT Devices
- Auto-Provision Physical Plug-and-Play IoT Devices (in this case, an MXCHIP AZ3166 multi-sensor device)
- Auto-Retire IoT Devices
- Create an AnyLogic Simulation
- Create a Python Simulation
- Create a Databricks Simulation
- Extend Simulations to integrate with Microsoft Bonsai
- Use Bonsai to teach an AI 'Brain' from a Simulation
- Apply the Bonsai Brain to a Simulation for verification
- Apply the Bonsai Brain to a digital twin
Before you begin, you may want to clone this entire repository to your local machine, to make some of the steps below more straightforward. This is a collaborative tutorial and sample. Please use the Issues feature of Github to notify others and get help if you run into issues with the instructions or and of the code samples. Issues can be addressed by anyone on the team. Please feel free to submit improvements to the repository, but please document any changes.
- Authorization to use a sandbox obtained from chris.lowndes@avanade.com (if using a provided Sandbox environmment)
- An Avanade or Accenture domain account (if using a provided Sandbox environmment)
- An Azure subscription for which you have global admin access (if not using a provided Sandbox environmment)
- (Optional) An MXCHIP AZ3166 multi-sensor device
To create your own sandbox environment, you will execute various commands in the Azure Cloud Shell. To access the Azure Cloud Shell, follow these steps:
- Navigate to the Azure Portal at https://portal.azure.com
- Login to the Azure Portal using your usual Avanade or Accenture credentials or global admin credentials for the subscription
- Once logged in, in the main search bar, type 'subscriptions'
- In the list of subscriptions, choose the subscription with ID as provided by the facilitator
- Once switched to the required subscription, select the Cloud Shell button shown circled in red
- Once the Cloud Shell opens at the bottom of your browser, switch from Bash mode to PowerShell mode
To create a resource group to hold all of your sandbox resources, follow these steps:
- Your sandbox name is your choice, but should follow this pattern:
industryx-sandbox-<<your-initials-or-name>> #this is your sandbox name. Please copy it as you will be using it frequently
- Execute the following command in the Cloud Shell
az group create --name <<your-sandbox-name>> --location japaneast
To create the IoT Hub instance follow these steps:
- Execute the following command using the Azure Cloud Shell
az deployment group create --resource-group <<your-sandbox-name>> --template-uri https://raw.githubusercontent.com/lowndesc/industryx/main/sandbox/IoTHub/azuredeploy.json
- Navigate to your sandbox resource group within the Azure Portal. You should see two resources similar to this image
For our sandbox, we will first create code-based 3 simulated devices, each transmitting the same range of telemetry on a schedule, using randomisation to vary the values transmitted and the scheduling.
To create the simulated devices, follow these steps:
- Navigate to your IoT Hub instance
- On the left-hand menu, under 'Settings', click 'Shared access policies'
- On the list of Shared access policies, click the policy named 'device'
- On the policy keys panel, copy the 'Primary connection string' and keep it for later use
- In the Azure Cloud Shell, click on the 'Upload Files' button
- In the 'Open' dialog, post the following file URL into the 'File name:' field
https://raw.githubusercontent.com/lowndesc/industryx/main/sandbox/SimulatedDevices/SimulatorCloudRunner.ps1
- This will copy a PowerShell script file from GitHub to your AZure Cloud Shell session
- Execute the following command in Azure Cloud Shell to run the PowerShell script, using the values copied earler
./SimulatorCloudRunner.ps1 -ResourceGroup <<your-sandbox-name>> -IotHubConnectionString <<your-iothub-connection-string>>
- After a few seconds, you should see a JSON confirmation that the process has started
- After around 1 minute, the 3 simulated device containers will have been deployed. Check your resource group to confirm.
To connect the simulated devices to the IoT Hub, follow these steps:
- Navigate to your IoT Hub instance
- In the left-hand menu, under 'Explorers', click 'IoT Devices'
- Click 'New' to add a new device
- For 'Device ID', type 'sim000001'
- Leave all other fields unchanged, and click 'Save'
- Repeat for two more devices, 'sim000002' and 'sim000003'
- Your device list should now look like this
To run the simulated devices, follow these steps:
- Navigate to the first simulated device container
- Click on 'Start' to start the container
- Repeat for each of your simulated device containers
- Once each container has started, the onboard functions will connect a device to the IoTHub using the registered device names you created in the previous task
- To verify that the connection is good. navigate back to the IoT Hub insatnce, and on the 'Overview' pane check the 'Iot Hub Usage' panel and it should show 'Iot Devices: 3' and 'Messages used today:' should be a number greater than 0
For our sandbox environment, we will be implementing digital twins using Microsoft's Azure Digital Twins (ADT). ADT is based on a spatial graph model as opposed to traditional relational database technology. In relational databases, to analyse relationships across different table entities, the time expensive ‘JOIN’ operation is used to combine related data. This operation is expensive as it requires index lookups and matching to related columns in other tables. This is a major advantage of graph data models. Graphs store entities and their relationships as nodes and content which may be augmented with additional attributes. Retrieving the relationship between two entities does not involve expensive ‘join’ operations.
Within our sandbox, we therefore require a single Azure Digital Twins (ADT) instance. Each instance can support many different digital twins based on many different models. For simplicity, it is best practice to use a separate ADT instance for each solution domain. ADT hosts the definitions of digital twins, based on underlying models, as well as the data describing the digital twins, including twin instances, components, relationships and properties. It does not, however, store underlying property and telemetry values, which are accessed from underlying storage. This makes ADT a highly performant abstraction of the digital twins it hosts.
ADT uses models to define digital twin types. Modelling is defined in a JSON-LD-based descriptive language called DTDL (Digital Twins Definition Language). DTDL models are interchangeble between device twins used in IoT Plug-and-Play scenarios and digital twins of those devices within Azure Digital Twins. Each DTDL model is an interface which defines the permissible structure of a digital twin. DTDL supports inheritance, such that one or more interfaces can be a base for other derived interfaces. A model is a generally a definition which can be instantiated as a digital twin. Howevere, a model can also be a component, used to compose other models, but not intended for instantiation by itself. An example would be a smartphone device, defined as containing components defining a front camera and a rear camera.
For manufaturing scenarios, we need a set of base models which can form the foundation for describing manufacturing assets and processes. These base models need to be extendible to problem-specific scenrios, such as production monitoring, optimization and simulation, as well as wider scenarios such as materials handling and supply chain modelling. These base models need to follow industry-specific standards, such as ISA-95 and ISA-88, and be gathered into an overall model known as a manufacturing ontology.
The starting point for our ontology is this basic data model as defined by the Industrial Automation standard ISA-95:
From this, we have derived a basic manufacturing ontology as captured in this diagram. We have incuded a few example Azure IoT Plug-and-PLay devices into our ontology, to illustrate how these interact with the manufacturing assets and processes. We are to use this ontology within our sandbox environments:
Take a moment to examine the manufacturing ontology in its raw JSON DTDL form at this location in this repository:
./sandbox/AzureDigitalTwins/models/manufacturing-ontology This entire folder/file structure is in the clone on your local drive for later use
To create your ADT instance within your sandbox, execute the following steps:
- Execute the following command in the Azure Cloud Shell:
az deployment group create --resource-group <<your-sandbox-name>> --template-uri https://raw.githubusercontent.com/lowndesc/industryx/main/sandbox/AzureDigitalTwins/azuredeploy.json
- When execution has finished, navigate to your sandbox ADT instance, on the Overview pain, copy the host name URL. You will need this later. In order to create permissions for you to access your ADT instance, you need to add yourself as an owner of this instance.
- On the left-hand menu, select 'Access Control (IAM)'
- On the 'Access Control (IAM)' pane, click +Add, and then 'Add role sssignment'
- In the 'Add role assignment' panel, in the Role dropdown, select the 'Azure Digital Twins Data Owner' role
- In the 'Select' field, type the start of your name, then select your user account from the list to add your user account to the 'Selected members' list
- Click Save to create the role assignment
- To verify that you can now access your ADT instance, navigate to the online ADT explorer.
- In the pop-up that asks for an Azure Digital Twins URL, type 'https://<<your host name URL from step 2 above>>' and click Save.
- On the top bar, click the cog icon to go to Settings. Set the Console and Output toggles to on.
- Now, when you click 'Run Query' in the top right, you should see a message in the centre pain that states 'No Results Found'.
Execute the following steps to upload the manufacturing ontology into the ADT model library:
- At the top of the left-hand Models panel, click the 'Upload a directory of Models' button:
- Select the folder at the head of your local copy of the manufacturing ontology 'manufacturing-ontology'
- Click Upload, and then OK when the system reminds you that you are uploading 40 models.
- When these have uploaded, on the centre pane, click the 'MODEL GRAPH' tab.
- You should see the entire manufacturing ontology with relationships and inheritance depicted.
We will now create sample digital twins of an entire manufacturing operation.
Execute the following steps to create the sample digital twins:
- In Azure Digital Twins Explorer, at the top of the centre pane, click the 'TWIN GRAPH' tab.
- At the top of the Twin Graph pane, click the 'Import Graph' button
- In the file dialog, navigate to the file:
./sandbox/AzureDigitalTwins/logistics-twin.xlsx
- Click Upload. The initial twin graph should load
- On the 'Graph Preview Only' screen, click the save icon in the top right-hand corner.
- The system will now process the graph, loading the nodes into ADT.
- This should process OK. If there are any errors, use the Output screen to determine where the errors may be occuring.
- Once processed, you can view a visualisation of the digital twins by returning to the 'TWIN GRAPH' tab and clicking 'Run Query' in the top right-hand corner.
- You can switch layout style for your graph using the 'Choose Layout' button at the top of the 'TWIN GRAPH' pane.
- Note that we have included a number of Azure IoT Plug-and-PLay devices into this sample digital twins. One such device is shown in detail on the attached image - an MXCHIP AZ3166 device named 'PnP-Sensors', contains multiple sensors, and will be the subject of auto-provisiioning later.
Now that we have created our digital twins of assets and spaces, we can add digital twins of the processes we wish to model. For our sandbox, we will model an end-to-end supply chain process, incorporating the manufacturing assets and spaces we have previously modelled. To create digital twins of an end-to-end supply chain, execute the following steps:
- In Azure Digital Twins Explorer, in the 'TWIN GRAPH' pane, click the 'Import Graph' button
- Select the file:
./sandbox/AzureDigitalTwins/logistics-supply-chain-twin.xlsx
- After the twins have loaded, click the Save icon in the top right-hand corner
- The system will process the supply chain twins
- To display only the supply chain process twins, copy the following SQL Query into the Azure Digitral Twins Explorer query field. This query is filtering the view to only those nodes and their relationships which inherit from the Supply Chain interface.
SELECT * FROM DIGITALTWINS WHERE IS_OF_MODEL('dtmi:isa95:core:SupplyChain;1')
- Run the query by clicking the 'Run Query' button.
To update a twin once it is created, you can either use the tools within the Azure Digital Twins explorer or prefereably refer to this article about using the ADT API Here, we will explain how to use the tools in Azure Digital Twins Explorer to make a simple change to an existing twin. Execute the following steps:
- In Azure Digital Twins Explorer, run the following SQL Query to return only assets and their relationships:
SELECT * FROM DIGITALTWINS WHERE IS_OF_MODEL('dtmi:isa95:core:Asset;1')
- In the results view, find and click on the node for our simulated device 'sim000001'
- Notice in the PROPERTIES panel on the right-hand side, there is a Property of the 'sim000001' twin called 'Name' for which the value is 'undefined'
- Edit the 'Name' property to be 'sim000001'.
- Notice that, after making this change, there is a record of the change added to the digital twin properties. All changes to digital twins are audited.
In order to synchronize data between features within our sandbox, we require some light Azure Functions. These we have grouped under a concept called TwinSync, a general principle to use Azure Functions where possible to execute data transformation code outside of any user context. These functions are descrete, low cost, highly scalable and reliable execution actions. The trigger is normally a standard Azure event occuring, such as a message arriving on an Event Grid topic or Event Hub.
We will use TwinSync functions to achieve the following:
- to sync IoTHub telemetry with Azure Digital Twins
- to sync Azure Digital Twins data with AnyLogic simulations
- to sync Azure Digital Twins data with Databricks simulations
For example, we have the following Function code available in a TwinSync Function named IoTHubtoTwins:
using System;
using Azure;
using System.Net.Http;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.EventGrid;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Avanade.Japan.Device.Simulation.Sync
{
public class IoTHubtoTwins
{
private static readonly string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL");
private static readonly HttpClient httpClient = new HttpClient();
[FunctionName("IoTHubtoTwins")]
public async void Run([EventGridTrigger] EventGridEvent eventGridEvent, ILogger log)
{
if (adtInstanceUrl == null) log.LogError("Application setting \"ADT_SERVICE_URL\" not set");
try
{
// Authenticate with Digital Twins
var cred = new ManagedIdentityCredential("https://digitaltwins.azure.net");
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions { Transport = new HttpClientTransport(httpClient) });
log.LogInformation($"ADT service client connection created.");
if (eventGridEvent != null && eventGridEvent.Data != null)
{
log.LogInformation(eventGridEvent.Data.ToString());
// <Find_device_ID_and_values>
JObject deviceMessage = (JObject)JsonConvert.DeserializeObject(eventGridEvent.Data.ToString());
string deviceId = (string)deviceMessage["systemProperties"]["iothub-connection-device-id"];
var temperature = deviceMessage["body"]["temperature"];
var humidity = deviceMessage["body"]["humidity"];
var power = deviceMessage["body"]["power"];
var vibration = deviceMessage["body"]["vibration"];
var uptime = deviceMessage["body"]["uptime"];
var capacity = deviceMessage["body"]["capacity"];
var wait = deviceMessage["body"]["wait"];
var delay = deviceMessage["body"]["delay"];
var arrivalRate = deviceMessage["body"]["arrivalRate"];
var counter = deviceMessage["body"]["Counter"].Value<int>(); // counter must be int
// </Find_device_ID_and_values>
// Correct integer telemetry values by dividing by 100
Random rnd = new Random();
int randomFactor = rnd.Next(3, 18); // a random integer from 3 to 17
var seed = (counter % 99) % randomFactor; // creates a seed value using the random factor, more frequent occurences at lower end
var t = temperature.Value<double>() / 100;
var h = humidity.Value<double>() / 100;
var p = power.Value<double>() / 100;
var v = vibration.Value<double>() / 100;
var u = uptime.Value<double>() / 100;
var c = (int)Math.Round(capacity.Value<double>() / 100); // round capacity to the nearest integer, even integer if midpoint
var w = wait.Value<double>() / 100;
var d = delay.Value<double>() / 100;
var ar = (17*Math.Sin(arrivalRate.Value<double>() / 100) + seed) / 15; // creates a more realistic arrival rate curve
log.LogInformation($"Device:{deviceId} Counter: [{counter}]");
log.LogInformation($"Device:{deviceId} Temperature: {t}");
log.LogInformation($"Device:{deviceId} Humidity: {h}");
log.LogInformation($"Device:{deviceId} Power: {p}");
log.LogInformation($"Device:{deviceId} Vibration: {v}");
log.LogInformation($"Device:{deviceId} Uptime: {u}");
log.LogInformation($"Device:{deviceId} Capacity: {c}");
log.LogInformation($"Device:{deviceId} Wait: {w}");
log.LogInformation($"Device:{deviceId} Delay: {d}");
log.LogInformation($"Device:{deviceId} ArrivalRate: {ar}");
// <Update_twin_with_device_values>
var updateTwinData = new JsonPatchDocument();
updateTwinData.AppendReplace("/Temperature", t);
updateTwinData.AppendReplace("/Humidity", h);
updateTwinData.AppendReplace("/Power", p);
updateTwinData.AppendReplace("/Vibration", v);
updateTwinData.AppendReplace("/Uptime", u);
updateTwinData.AppendReplace("/Capacity", c);
updateTwinData.AppendReplace("/Wait", w);
updateTwinData.AppendReplace("/Delay", d);
updateTwinData.AppendReplace("/ArrivalRate", ar);
await client.UpdateDigitalTwinAsync(deviceId, updateTwinData);
await client.PublishTelemetryAsync(deviceId,null, new JObject
{
{ "Temperature", t },
{ "Humidity", h },
{ "Power", p },
{ "Vibration", v },
{ "Uptime", u },
{ "Capacity", c },
{ "Wait", w },
{ "Delay", d },
{ "ArrivalRate", ar }
}.ToString());
// </Update_twin_with_device_values>
}
}
catch (Exception ex)
{
log.LogError($"Error in ingest function: {ex.Message} | {ex.InnerException} | {ex.StackTrace}");
}
}
}
}
Once the events are flowing to our function app, they should be flowing into our ADT twins. To verify the end-to-end flow, execute the following steps:
- Verify that telemetry events are flowing to our function
- Verify that the function is processing the telemetry events without errors
- Verify that properties of ADT sensor twins are being updated
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Azure;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.Devices.Provisioning.Service;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Samples.AdtIothub
{
public static class DpsAdtAllocationFunc
{
private const string adtAppId = "https://digitaltwins.azure.net";
private static string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL");
private static readonly HttpClient singletonHttpClientInstance = new HttpClient();
[FunctionName("DpsAdtAllocationFunc")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req, ILogger log)
{
// Get request body
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
log.LogDebug($"Request.Body: {requestBody}");
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Get registration ID of the device
string regId = data?.deviceRuntimeContext?.registrationId;
bool fail = false;
string message = "Uncaught error";
var response = new ResponseObj();
// Must have unique registration ID on DPS request
if (regId == null)
{
message = "Registration ID not provided for the device.";
log.LogInformation("Registration ID: NULL");
fail = true;
}
else
{
string[] hubs = data?.linkedHubs.ToObject<string[]>();
// Must have hubs selected on the enrollment
if (hubs == null
|| hubs.Length < 1)
{
message = "No hub group defined for the enrollment.";
log.LogInformation("linkedHubs: NULL");
fail = true;
}
else
{
// Find or create twin based on the provided registration ID and model ID
dynamic payloadContext = data?.deviceRuntimeContext?.payload;
string dtmi = payloadContext.modelId;
log.LogDebug($"payload.modelId: {dtmi}");
string dtId = await FindOrCreateTwinAsync(dtmi, regId, log);
// Get first linked hub (TODO: select one of the linked hubs based on policy)
response.iotHubHostName = hubs[0];
// Specify the initial tags for the device.
var tags = new TwinCollection();
tags["dtmi"] = dtmi;
tags["dtId"] = dtId;
// Specify the initial desired properties for the device.
var properties = new TwinCollection();
// Add the initial twin state to the response.
var twinState = new TwinState(tags, properties);
response.initialTwin = twinState;
}
}
log.LogDebug("Response: " + ((response.iotHubHostName != null)? JsonConvert.SerializeObject(response) : message));
return fail
? new BadRequestObjectResult(message)
: (ActionResult)new OkObjectResult(response);
}
public static async Task<string> FindOrCreateTwinAsync(string dtmi, string regId, ILogger log)
{
// Create Digital Twins client
var cred = new ManagedIdentityCredential(adtAppId);
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions
{
Transport = new HttpClientTransport(singletonHttpClientInstance)
});
// Find existing DigitalTwin with registration ID
try
{
// Get DigitalTwin with Id 'regId'
BasicDigitalTwin existingDt = await client.GetDigitalTwinAsync<BasicDigitalTwin>(regId).ConfigureAwait(false);
// Check to make sure it is of the correct model type
if (StringComparer.OrdinalIgnoreCase.Equals(dtmi, existingDt.Metadata.ModelId))
{
log.LogInformation($"DigitalTwin {existingDt.Id} already exists");
return existingDt.Id;
}
// Found DigitalTwin but it is not of the correct model type
log.LogInformation($"Found DigitalTwin {existingDt.Id} but it is not of model {dtmi}");
}
catch(RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound)
{
log.LogDebug($"Did not find DigitalTwin {regId}");
}
// Either the DigitalTwin was not found, or we found it but it is of a different model type
// Create or replace it with what it needs to be, meaning if it was not found a brand new DigitalTwin will be created
// and if it was of a different model, it will replace that existing DigitalTwin
// If it was intended to only create the DigitalTwin if there is no matching DigitalTwin with the same Id,
// ETag.All could have been used as the ifNonMatch parameter to the CreateOrReplaceDigitalTwinAsync method call.
// Read more in the CreateOrReplaceDigitalTwinAsync documentation here:
// https://docs.microsoft.com/en-us/dotnet/api/azure.digitaltwins.core.digitaltwinsclient.createorreplacedigitaltwinasync?view=azure-dotnet
BasicDigitalTwin dt = await client.CreateOrReplaceDigitalTwinAsync(
regId,
new BasicDigitalTwin
{
Metadata = { ModelId = dtmi },
Contents =
{
{ "Temperature", 0.0 },
{
"deviceInformation",
new BasicDigitalTwinComponent
{
Metadata = {},
Contents =
{
{ "manufacturer", "MXCHIP" }
}
}
}
}
}
).ConfigureAwait(false);
log.LogInformation($"Digital Twin {dt.Id} created.");
return dt.Id;
}
}
/// <summary>
/// Expected function result format
/// </summary>
public class ResponseObj
{
public string iotHubHostName { get; set; }
public TwinState initialTwin { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Azure;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Samples.AdtIothub
{
public static class DeviceTelemetryToTwinFunc
{
private static string adtAppId = "https://digitaltwins.azure.net";
private static readonly string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL", EnvironmentVariableTarget.Process);
private static readonly HttpClient singletonHttpClientInstance = new HttpClient();
[FunctionName("DeviceTelemetryToTwinFunc")]
public static async Task Run(
[EventHubTrigger("deviceevents", Connection = "EVENTHUB_CONNECTIONSTRING")] EventData eventData, ILogger log)
{
log.LogInformation($"C# function triggered to process a message: {eventData}");
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(eventData.Body)}");
// Metadata accessed by binding to EventData
log.LogInformation($"EnqueuedTimeUtc={eventData.SystemProperties.EnqueuedTimeUtc}");
log.LogInformation($"SequenceNumber={eventData.SystemProperties.SequenceNumber}");
log.LogInformation($"Offset={eventData.SystemProperties.Offset}");
//var exceptions = new List<Exception>(events.Length);
// Create Digital Twin client
var cred = new ManagedIdentityCredential(adtAppId);
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions
{
Transport = new HttpClientTransport(singletonHttpClientInstance)
});
try
{
log.LogInformation($"EventData: {System.Text.Json.JsonSerializer.Serialize(eventData)}");
// Get message body
string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
log.LogInformation($"MessageBody: {messageBody}");
// Reading Device ID from message headers
JObject jbody = (JObject)JsonConvert.DeserializeObject(messageBody);
string deviceId = eventData.SystemProperties["iothub-connection-device-id"].ToString();
log.LogInformation($"DeviceId: {deviceId}");
// Extracting temperature from device telemetry
double temperature = Convert.ToDouble(jbody["temperature"].ToString());
// Update device Temperature property
var updateTwinData = new JsonPatchDocument();
updateTwinData.AppendReplace("/Temperature", temperature);
log.LogInformation($"ADT Patch Document: {updateTwinData.ToString()}");
await client.UpdateDigitalTwinAsync(deviceId, updateTwinData);
log.LogInformation($"Updated Temperature of device Twin {deviceId} to: {temperature}");
}
catch (Exception e)
{
// We need to keep processing the rest of the batch - capture this exception and continue.
log.LogError(e, "Function error");
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Azure;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace Samples.AdtIothub
{
public static class DeleteDeviceInTwinFunc
{
private static string adtAppId = "https://digitaltwins.azure.net";
private static readonly string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL", EnvironmentVariableTarget.Process);
private static readonly HttpClient singletonHttpClientInstance = new HttpClient();
[FunctionName("DeleteDeviceInTwinFunc")]
public static async Task Run(
[EventHubTrigger("lifecycleevents", Connection = "EVENTHUB_CONNECTIONSTRING")] EventData[] events, ILogger log)
{
var exceptions = new List<Exception>(events.Length);
// Create Digital Twin client
var cred = new ManagedIdentityCredential(adtAppId);
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions
{
Transport = new HttpClientTransport(singletonHttpClientInstance)
});
foreach (EventData eventData in events)
{
try
{
//log.LogDebug($"EventData: {System.Text.Json.JsonSerializer.Serialize(eventData)}");
string opType = eventData.Properties["opType"] as string;
if (opType == "deleteDeviceIdentity")
{
string deviceId = eventData.Properties["deviceId"] as string;
try
{
// Find twin based on the original Registration ID
BasicDigitalTwin digitalTwin = await client.GetDigitalTwinAsync<BasicDigitalTwin>(deviceId);
// In order to delete the twin, all relationships must first be removed
await DeleteAllRelationshipsAsync(client, digitalTwin.Id, log);
// Delete the twin
await client.DeleteDigitalTwinAsync(digitalTwin.Id, digitalTwin.ETag);
log.LogInformation($"Twin {digitalTwin.Id} deleted in DT");
}
catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.NotFound)
{
log.LogWarning($"Twin {deviceId} not found in DT");
}
}
}
catch (Exception e)
{
// We need to keep processing the rest of the batch - capture this exception and continue.
exceptions.Add(e);
}
}
if (exceptions.Count > 1)
throw new AggregateException(exceptions);
if (exceptions.Count == 1)
throw exceptions.Single();
}
/// <summary>
/// Deletes all outgoing and incoming relationships from a specified digital twin
/// </summary>
public static async Task DeleteAllRelationshipsAsync(DigitalTwinsClient client, string dtId, ILogger log)
{
AsyncPageable<BasicRelationship> relationships = client.GetRelationshipsAsync<BasicRelationship>(dtId);
await foreach (BasicRelationship relationship in relationships)
{
await client.DeleteRelationshipAsync(dtId, relationship.Id, relationship.ETag);
log.LogInformation($"Twin {dtId} relationship {relationship.Id} deleted in DT");
}
AsyncPageable<IncomingRelationship> incomingRelationships = client.GetIncomingRelationshipsAsync(dtId);
await foreach (IncomingRelationship incomingRelationship in incomingRelationships)
{
await client.DeleteRelationshipAsync(incomingRelationship.SourceId, incomingRelationship.RelationshipId);
log.LogInformation($"Twin {dtId} incoming relationship {incomingRelationship.RelationshipId} from {incomingRelationship.SourceId} deleted in DT");
}
}
}
}
The Azure Device Provisioning Service (DPS) for IoT Hub automatically provisions registered devices so that they will communicate with the correct IoT Hub. The DPS can also be customized to do additional tasks. In our case, we will customize it to also create a Twin in Azure Digital Twins for every device provisioned.
- When the device is freshly manufactured, it is configured to first communicate with the DPS.
- The DPS then validates the device by checking if the device is in the enrollment list.
- Custom: If the device is deemed valid, the Azure Function (DpsAdtAllocationFunc) creates a Twin for the device in Azure Digital Twins.
- If the device is deemed valid, it is added as a device in the appropriate IoTHub.
- The device is given back connection details for the appropriate IoTHub where it will then start sending telemetry data.
The DPS acts like a 'front desk' in a hotel setting, where it identifies the 'guest' (device) and if the 'guest' has a 'booking' / 'reservation' (enrollment) directs him/her to the appropriate 'room' (IoTHub).
*** Setting up a DPS ***
- Create a resource 'Device Provisioning Services'. Provide the appropriate Subscription, Resource Group, Name, and Region.
- Once it is done creating, go to the resource and select 'Linked IoT Hubs'. In here, we will add the IoTHub/s where our devices will be directed to.
- Provide the details of the IoTHub to be added then click 'Save'.
- Go to 'Manage Enrollments' then select 'Add Individual Enrollment'. In here, we will add an individual device to the enrollment list.
- Fill in the details of the enrollment:
- Use 'Symmetric Key' and provide a 'Registration ID' for the device. Copy and save the Registration ID since it will be used later when configuring the IoT Device.
- Select 'Custom (Use Azure Function)' in assigning devices to hubs, since we will run 'DpsAdtAllocationFunc'. Also make sure the device will be assigned to the correct IoTHub if you have multiple IoTHubs.
- Select the provisioned 'DpsAdtAllocationFunc' then save the enrollment.
- Click on the newly enrolled device then copy and save the 'Primary Key'. This will be used later when configuring the IoT Device.
- Go to 'Overview' then copy and save 'Service Endpoint' and 'ID Scope'. These will also be used in configuring the IoT Device.
Next we will be setting up a simulated device that's supposedly freshly manufactured. The simulator can be run on your local machine.
*** Setting up a simulated freshly manufactured device ***
- Install nodejs on your local machine. https://nodejs.org/en/download/
- The IoT device simulator can be found in
./sandbox/PnPDevices/azure-iot-rpisimulator
- insert the 'Service Endpoint' in 'provisioningHost'
- insert the 'ID Scope' in 'idScope'
- insert the device's registration id in 'registrationId'
- insert the primary symmetric key for the device in 'symmetricKey'
- the 'modelId' corresponds to the MXCHIP AZ3166 multi-sensor device model in Azure Digital Twins. (don't change it)
- Run the ff. command in the IoT device simulator directory:
npm install
*** Testing the DPS ***
- Open Azure Digital Twins and run this query.
SELECT * FROM DIGITALTWINS WHERE IS_OF_MODEL('dtmi:azurertos:devkit:gsgmxchip;2')
- Currently, only one MXCHIP AZ3166 named 'PnP-Sensors' has an existing twin. This was the one imported to ADT earlier.
- Run the simulator using the ff. command:
npm start
- The logs should look like this if successfully run.
- An error message will appear if there is a problem in provisioning. This is usually a problem in the Azure Function configuration and role.
- The ff. logs should appear when the simulated device is successfully provisioned and is sending telemetry data to the correct IoTHub.
- Run the query in step 1 again in Azure Digital Twins. The simulated device should now appear in the twin graph.
Azure EventHub is a service for ingesting streams of data from various sources including IoT telemetry. This data can be used by various consumers for further processing and analysis. In our case, we will be using EventHub to:
- Send telemetry data to ADT to update twins (DeviceTelemetryToTwinFunc). This is somewhat similar to TwinSync, except now, telemetry data is coming from EventHub instead of EventGrid.
- Delete a device's twin in ADT whenever the device is deleted in IoTHub (DeleteDeviceInTwinFunc).
In the following steps, we'll be showing how to setup an event hub for use case 1, i.e., for the DeviceTelemetryToTwinFunc function. You can use the same steps for setting up use case 2 (DeleteDeviceInTwinFunc).
*** Creating an EventHub Namespace ***
- In order to create an event hub, we first need an event hubs namespace. Create a resource named 'Event Hubs' and fill in the appropriate Subscription, Resource Group, Location, and Pricing Tier. Also provide a namespace name.
- Once the eventhub namespace is created, we'll need to add a policy so that we can read messages being sent to the EventHubs belonging to this namespace. Go to the resource, select 'Shared access policies', and click 'Add' to add a new SAS Policy.
- Provide a name for the policy, then select the 'Listen' checkbox. This allows us to read events coming in to the EventHubs in this namespace.
- Once the SAS policy is created, click it, copy the 'Connection string-primary key', then save this connection string. This is the connection string that will be used by our Azure Functions to read data from the EventHubs.
*** Creating an EventHub ***
- Go to 'Event Hubs' then click '+ Event Hub'.
- Give the EventHub a name then click 'Create'. Make sure this name matches with the EventHub name in the Azure Function EventHubTrigger Attribute.
*** Setting up Azure Functions ***
- The only thing we need to setup in Azure Functions is the connection string which will give the function read access to EventHubs. In Azure Functions, select 'Configuration'.
- In here we'll add a new application setting that corresponds to the event hub connection string.
- Fill in the name of the application setting and the value of the connection string. The name must match the application setting name being referred to in the code.
- Click 'OK' then click 'Save' to restart the function app.
*** Setting up IoTHub ***
In IoTHub, we need to route messages towards the appropriate EventHub. To do this, we'll use IoTHub's built-in message routing.
- Go to the IoTHub instance, click on 'Message Routing' and select 'Custom Endpoints'. In here we will add our EventHubs as custom endpoints where messages will be sent. Click '+Add' and select 'Event hubs'.
- Provide a name for the custom endpoint and specify the EventHub namespace and instance. (do the same for 'lifecycleevents')
- Go to 'Routes' and click on '+ Add'.
- Provide a name for the route and specify appropriate custom endpoint and data source. (for lifecycleevents, select 'Device Lifecycle Events')
- You could also specify a routing query which will filter the messages being routed. For both 'deviceevents' and 'lifecycleevents', we'll just use the default query 'true'. This means no filter will be applied and all messages will be routed.
- Click 'Save'.
We can verify that we are updating our twin using the telemetry data from the simulated device. In ADT, we can see that our provisioned device initially has 0 for the value of temperature.
After running the simulated device, we should get the following logs for 'DeviceTelemetryToTwinFunc' (enable 'Application Insights' to see function logs). We should see that the temperature is being set to the value 2195.
Upon reloading the query in ADT, we can see that the value for temperature of the twin is also updated.
Given that we've already added an EventHub for 'lifecycleevents', we can now perform auto-retiring on the simulated device, i.e., when a device is deleted in IoTHub, its twin in ADT should also be deleted. To test this, we'll simply delete our provisioned simulated device from IoTHub.
We can verify in the DeleteDeviceInTwinFunc function logs that deletion on ADT was performed.
We can also verify in ADT that our simulated device's twin no longer exists.
- Create a Databricks workspace
- Create a new Python Notebook
- Add the following codeblock in a new cell in the Notebook. This codeblock installs the required ADT and Azure Identity Python packages
%pip install azure-digitaltwins-core azure-identity
- Add the following codeblock in a new cell under the previous. This codeblock creates the ADT client and uses it to query models or to query digital twins matching a relationship to another digital twin.
# Establish a client connection to ADT
# DefaultAzureCredential supports different authentication mechanisms and determines the appropriate credential type based of the environment it is executing in.
# It attempts to use multiple credential types in an order until it finds a working credential.
import os
import azure.identity
import azure.digitaltwins.core
# - AZURE_ADT_URL: The URL to the ADT in Azure
url = os.getenv("AZURE_ADT_URL")
# DefaultAzureCredential expects the following three environment variables:
# - AZURE_TENANT_ID: The tenant ID in Azure Active Directory
# - AZURE_CLIENT_ID: The application (client) ID registered in the AAD tenant
# - AZURE_CLIENT_SECRET: The client secret for the registered application
credential = azure.identity.DefaultAzureCredential()
service_client = azure.digitaltwins.core.DigitalTwinsClient(url, credential)
# List ADT models
# listed_models = service_client.list_models()
# for model in listed_models:
# print(model)
# print()
# print()
# Query ADT for the Process Twin
query_expression ='SELECT Station FROM DIGITALTWINS Station JOIN Process RELATED Station.isStepOf Relationship WHERE Process.$dtId = \'pp_BodyPanelProduction_01\''
query_result = service_client.query_twins(query_expression)
print('DigitalTwins:')
for twin in query_result:
print(twin)
- Add the following codeblock in a new cell under the previous. This codeblock uses the ADT client to query the telemetry of one sensor, obtaining a value every 5 seconds.
# Query ADT for the Process Delay telemetry for the Coating Station
import time
query_expression = 'SELECT Device.Delay FROM DigitalTwins Device WHERE $dtId=\'sim000001\''
txt = '{}:{}'
for x in range(49):
query_result = service_client.query_twins(query_expression)
for value in query_result:
print(txt.format(x,value))
time.sleep(5)
- Create a new Cluster
- Add ADT Environment Variables to the Cluster
AZURE_ADT_URL=<The URL for your ADT in Azure, including 'https://'>
AZURE_TENANT_ID=<The tenant ID of your Azure Active Directory>
AZURE_CLIENT_ID=<The application (client) ID registered in the AAD tenant>
AZURE_CLIENT_SECRET=<The client secret for the registered application>
- Start the Cluster
- Attach the Notebook to the Cluster
- Execute each cell until you see ADT telemetry streaming
- Add an Interface document to the Databricks Workspace
- Add the following cells to the Notebook
- Create a new Brain in Microsoft Bonsai
- Add Inkling code to the new Brain
- Train the Brain using the registered Databricks Simulation
- Observe the Brain as teaching takes place
- Add the following environment variables to the Cluster, by first stopping the Cluster, editing the variables, and then restarting the Cluster.
SIM_WORKSPACE=<the workspace ID from your Bonsai workspace>
SIM_ACCESS_KEY=<the access key from your Bonsai workspace>
- Detach and re-attach the Notebook from the Cluster
- Add the following codeblock to the Notebook in a new cell. This codeblock installs the required package to connect with Microsoft Bonsai.
%pip install git+https://github.com/microsoft/bonsai-common
- Add the following codeblock to the Notebook in a new cell. This codeblock checks that the path to the Interface document is correct.
interface_file_path = "/FileStore/tables/cartpole-py/cartpole_interface.json"
display(dbutils.fs.ls("dbfs:" + interface_file_path))
- Add the following codeblock to the Notebook in a new cell. This codeblock is the Python simulation. It includes code which registers the simulation
"""
Classic cart-pole system implemented by Rich Sutton et al.
Derived from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""
__copyright__ = "Copyright 2020, Microsoft Corp."
# pyright: strict
import math
import os
import random
import sys
import json
from bonsai_common import SimulatorSession, Schema
from microsoft_bonsai_api.simulator.client import BonsaiClientConfig
from microsoft_bonsai_api.simulator.generated.models import SimulatorInterface
# Constants
GRAVITY = 9.8 # a classic...
CART_MASS = 0.31 # kg
POLE_MASS = 0.055 # kg
TOTAL_MASS = CART_MASS + POLE_MASS
POLE_HALF_LENGTH = 0.4 / 2 # half the pole's length in m
POLE_MASS_LENGTH = POLE_MASS * POLE_HALF_LENGTH
FORCE_MAG = 1.0
STEP_DURATION = 0.02 # seconds between state updates (20ms)
TRACK_WIDTH = 1.0 # m
FORCE_NOISE = 0.02 # % of FORCE_MAG
# Model parameters
class CartPoleModel(SimulatorSession):
def reset(
self,
initial_cart_position: float = 0,
initial_pole_angle: float = 0,
target_pole_position: float = 0,
):
# cart position (m)
self._cart_position = initial_cart_position
# cart velocity (m/s)
self._cart_velocity = 0
# cart angle (rad)
self._pole_angle = initial_pole_angle
# pole angular velocity (rad/s)
self._pole_angular_velocity = 0
# pole position (m)
self._pole_center_position = 0
# pole velocity (m/s)
self._pole_center_velocity = 0
# target pole position (m)
self._target_pole_position = target_pole_position
def step(self, command: float):
# We are expecting the input command to be -1 or 1,
# but we'll support a continuous action space.
# Add a small amount of random noise to the force so
# the policy can't succeed by simply applying zero
# force each time.
force = FORCE_MAG * (command + random.uniform(-0.02, 0.02))
cosTheta = math.cos(self._pole_angle)
sinTheta = math.sin(self._pole_angle)
temp = (
force + POLE_MASS_LENGTH * self._pole_angular_velocity ** 2 * sinTheta
) / TOTAL_MASS
angularAccel = (GRAVITY * sinTheta - cosTheta * temp) / (
POLE_HALF_LENGTH * (4.0 / 3.0 - (POLE_MASS * cosTheta ** 2) / TOTAL_MASS)
)
linearAccel = temp - (POLE_MASS_LENGTH * angularAccel * cosTheta) / TOTAL_MASS
self._cart_position = self._cart_position + STEP_DURATION * self._cart_velocity
self._cart_velocity = self._cart_velocity + STEP_DURATION * linearAccel
self._pole_angle = (
self._pole_angle + STEP_DURATION * self._pole_angular_velocity
)
self._pole_angular_velocity = (
self._pole_angular_velocity + STEP_DURATION * angularAccel
)
# Use the pole center, not the cart center, for tracking
# pole center velocity.
self._pole_center_position = (
self._cart_position + math.sin(self._pole_angle) * POLE_HALF_LENGTH
)
self._pole_center_velocity = (
self._cart_velocity
+ math.sin(self._pole_angular_velocity) * POLE_HALF_LENGTH
)
def halted(self):
# If the pole has fallen past 45 degrees, there's no use in continuing.
return abs(self._pole_angle) >= math.pi / 4
def state(self):
return {
"cart_position": self._cart_position,
"cart_velocity": self._cart_velocity,
"pole_angle": self._pole_angle,
"pole_angular_velocity": self._pole_angular_velocity,
"pole_center_position": self._pole_center_position,
"pole_center_velocity": self._pole_center_velocity,
"target_pole_position": self._target_pole_position,
}
# Callbacks
def get_state(self):
return self.state()
def get_interface(self) -> SimulatorInterface:
with open("/dbfs" + interface_file_path, "r") as file:
json_interface = file.read()
interface = json.loads(json_interface)
return SimulatorInterface(
name=interface["name"],
timeout=interface["timeout"],
simulator_context=self.get_simulator_context(),
description=interface["description"],
)
def episode_start(self, config: Schema):
self.reset(
config.get("initial_cart_position") or 0,
config.get("initial_pole_angle") or 0,
config.get("target_pole_position") or 0,
)
def episode_step(self, action: Schema):
self.step(action.get("command") or 0)
if __name__ == "__main__":
config = BonsaiClientConfig(argv=sys.argv)
cartpole = CartPoleModel(config)
cartpole.reset()
while cartpole.run():
continue